diff --git a/cmd/kar-controllers/app/options/options.go b/cmd/kar-controllers/app/options/options.go index 0cd339e17..7d42288cc 100644 --- a/cmd/kar-controllers/app/options/options.go +++ b/cmd/kar-controllers/app/options/options.go @@ -1,19 +1,4 @@ /* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -/* Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. Licensed under the Apache License, Version 2.0 (the "License"); @@ -28,6 +13,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package options import ( @@ -41,7 +27,6 @@ import ( type ServerOption struct { Master string Kubeconfig string - SchedulerName string Dispatcher bool AgentConfigs string SecurePort int @@ -68,11 +53,10 @@ func (s *ServerOption) AddFlags(fs *flag.FlagSet) { // Set defaults via environment variables s.loadDefaultsFromEnvVars() - fs.StringVar(&s.Master, "scheduler", s.SchedulerName, "scheduler name for placing pods") fs.StringVar(&s.Master, "master", s.Master, "The address of the Kubernetes API server (overrides any value in kubeconfig)") fs.StringVar(&s.Kubeconfig, "kubeconfig", s.Kubeconfig, "Path to kubeconfig file with authorization and master location information.") - fs.BoolVar(&s.Dispatcher, "dispatcher", s.Dispatcher, "set dispather mode(true) or agent mode(false)") - fs.StringVar(&s.AgentConfigs, "agentconfigs", s.AgentConfigs, "Paths to agent config file:deploymentName separted by commas(,)") + fs.BoolVar(&s.Dispatcher, "dispatcher", s.Dispatcher, "set dispatcher mode(true) or agent mode(false)") + fs.StringVar(&s.AgentConfigs, "agentconfigs", s.AgentConfigs, "Comma-separated paths to agent config file:deploymentName") fs.BoolVar(&s.DynamicPriority, "dynamicpriority", s.DynamicPriority, "If true, set controller to use dynamic priority. If false, set controller to use static priority. Default is false.") fs.BoolVar(&s.Preemption, "preemption", s.Preemption, "Set controller to allow preemption if set to true. Note: when set to true, the Kubernetes Scheduler must be configured to enable preemption. Default is false.") fs.IntVar(&s.BackoffTime, "backofftime", s.BackoffTime, "Number of seconds a job will go away for, if it can not be scheduled. Default is 20.") diff --git a/cmd/kar-controllers/app/server.go b/cmd/kar-controllers/app/server.go index dc72179dd..4deb4ac14 100644 --- a/cmd/kar-controllers/app/server.go +++ b/cmd/kar-controllers/app/server.go @@ -1,19 +1,4 @@ /* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -/* Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. Licensed under the Apache License, Version 2.0 (the "License"); @@ -28,18 +13,23 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + package app import ( + "net/http" + "strings" + + _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" - "net/http" + "k8s.io/utils/pointer" "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/health" - - _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" ) func buildConfig(master, kubeconfig string) (*rest.Config, error) { @@ -50,17 +40,29 @@ func buildConfig(master, kubeconfig string) (*rest.Config, error) { } func Run(opt *options.ServerOption) error { - config, err := buildConfig(opt.Master, opt.Kubeconfig) + restConfig, err := buildConfig(opt.Master, opt.Kubeconfig) if err != nil { return err } neverStop := make(chan struct{}) - config.QPS = 100.0 - config.Burst = 200.0 + restConfig.QPS = 100.0 + restConfig.Burst = 200.0 + + mcadConfig := &config.MCADConfiguration{ + DynamicPriority: pointer.Bool(opt.DynamicPriority), + Preemption: pointer.Bool(opt.Preemption), + BackoffTime: pointer.Int32(int32(opt.BackoffTime)), + HeadOfLineHoldingTime: pointer.Int32(int32(opt.HeadOfLineHoldingTime)), + QuotaEnabled: &opt.QuotaEnabled, + } + extConfig := &config.MCADConfigurationExtended{ + Dispatcher: pointer.Bool(opt.Dispatcher), + AgentConfigs: strings.Split(opt.AgentConfigs, ","), + } - jobctrl := queuejob.NewJobController(config, opt) + jobctrl := queuejob.NewJobController(restConfig, mcadConfig, extConfig) if jobctrl == nil { return nil } @@ -86,4 +88,3 @@ func listenHealthProbe(opt *options.ServerOption) error { return nil } - diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 000000000..9508a6193 --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,60 @@ +/* +Copyright 2023 The Multi-Cluster App Dispatcher Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +// MCADConfiguration defines the core MCAD configuration. +type MCADConfiguration struct { + // dynamicPriority sets the controller to use dynamic priority. + // If false, it sets the controller to use static priority. + // It defaults to false. + // +optional + DynamicPriority *bool `json:"dynamicPriority,omitempty"` + + // preemption sets the controller to allow preemption. + // Note: when set to true, the Kubernetes scheduler must be configured + // to enable preemption. It defaults to false. + // +optional + Preemption *bool `json:"preemption,omitempty"` + + // backoffTime defines the duration, in seconds, a job will go away, + // if it can not be scheduled. + // +optional + BackoffTime *int32 `json:"backoffTime,omitempty"` + + // headOfLineHoldingTime defines the duration in seconds a job can stay at the + // Head Of Line without being bumped. + // It defaults to 0. + // +optional + HeadOfLineHoldingTime *int32 `json:"headOfLineHoldingTime,omitempty"` + + // quotaEnabled sets whether quota management is enabled. + // It defaults to false. + // +optional + QuotaEnabled *bool `json:"quotaEnabled,omitempty"` +} + +// MCADConfigurationExtended defines the extended MCAD configuration, e.g., +// for experimental features. +type MCADConfigurationExtended struct { + // dispatcher sets the controller in dispatcher mode, of in agent mode. + // It defaults to false. + // +optional + Dispatcher *bool `json:"dispatcher,omitempty"` + + // agentConfigs contains paths to agent config file + AgentConfigs []string `json:"agentConfigs,omitempty"` +} diff --git a/pkg/config/support.go b/pkg/config/support.go new file mode 100644 index 000000000..c745018bd --- /dev/null +++ b/pkg/config/support.go @@ -0,0 +1,44 @@ +/* +Copyright 2023 The Multi-Cluster App Dispatcher Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +func (c *MCADConfiguration) IsQuotaEnabled() bool { + return isTrue(c.QuotaEnabled) +} + +func (c *MCADConfiguration) HasPreemption() bool { + return isTrue(c.Preemption) +} + +func (c *MCADConfiguration) HasDynamicPriority() bool { + return isTrue(c.DynamicPriority) +} + +func (c *MCADConfiguration) BackoffTimeOrDefault(val int32) int32 { + if c.BackoffTime == nil { + return val + } + return *c.BackoffTime +} + +func (e *MCADConfigurationExtended) IsDispatcher() bool { + return isTrue(e.Dispatcher) +} + +func isTrue(v *bool) bool { + return v != nil && *v +} diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index f17ad0587..7b7a80a9b 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -32,48 +32,43 @@ import ( "github.com/eapache/go-resiliency/retrier" "github.com/hashicorp/go-multierror" - qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util" - - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager" dto "github.com/prometheus/client_model/go" - "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" + arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" + clientset "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/clientset/versioned" + informerFactory "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/informers/externalversions" + arbinformers "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/informers/externalversions/controller/v1beta1" + arblisters "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/listers/controller/v1beta1" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" + clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/metrics/adapter" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobdispatch" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/genericresource" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager" + qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util" + + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - - v1 "k8s.io/api/core/v1" - - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/genericresource" - "k8s.io/apimachinery/pkg/labels" - - arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" - clientset "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/clientset/versioned" - - informerFactory "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/informers/externalversions" - arbinformers "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/informers/externalversions/controller/v1beta1" - - arblisters "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/listers/controller/v1beta1" - - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobdispatch" - - clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api" ) +// defaultBackoffTime is the default backoff time in seconds +const defaultBackoffTime = 20 + // XController the AppWrapper Controller type type XController struct { - config *rest.Config - serverOption *options.ServerOption + // MCAD configuration + config config.MCADConfiguration appwrapperInformer arbinformers.AppWrapperInformer // resources registered for the AppWrapper @@ -137,11 +132,6 @@ type JobAndClusterAgent struct { queueJobAgentKey string } -// RegisterAllQueueJobResourceTypes - registers all resources -// func RegisterAllQueueJobResourceTypes(regs *queuejobresources.RegisteredResources) { -// respod.Register(regs) -// } - func GetQueueJobKey(obj interface{}) (string, error) { qj, ok := obj.(*arbv1.AppWrapper) if !ok { @@ -154,7 +144,6 @@ func GetQueueJobKey(obj interface{}) (string, error) { // UpdateQueueJobStatus was part of pod informer, this is now a method of queuejob_controller file. // This change is done in an effort to simplify the controller and enable to move to controller runtime. func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error { - labelSelector := fmt.Sprintf("%s=%s", "appwrapper.mcad.ibm.com", queuejob.Name) pods, errt := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector}) if errt != nil { @@ -196,7 +185,7 @@ func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error { // consumed by existing pods. // For a large cluster with thousands of nodes and hundreds of thousands of pods this // method could be a performance bottleneck -// We can then move this method to a seperate thread that basically runs every X interval and +// We can then move this method to a separate thread that basically runs every X interval and // provides resources available to the next AW that needs to be dispatched. // Obviously the thread would need locking and timer to expire cache. // May be moved to controller runtime can help. @@ -233,12 +222,11 @@ func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource { } // NewJobController create new AppWrapper Controller -func NewJobController(config *rest.Config, serverOption *options.ServerOption) *XController { +func NewJobController(restConfig *rest.Config, mcadConfig *config.MCADConfiguration, extConfig *config.MCADConfigurationExtended) *XController { cc := &XController{ - config: config, - serverOption: serverOption, - clients: kubernetes.NewForConfigOrDie(config), - arbclients: clientset.NewForConfigOrDie(config), + config: *mcadConfig, + clients: kubernetes.NewForConfigOrDie(restConfig), + arbclients: clientset.NewForConfigOrDie(restConfig), eventQueue: cache.NewFIFO(GetQueueJobKey), agentEventQueue: cache.NewFIFO(GetQueueJobKey), // initQueue: cache.NewFIFO(GetQueueJobKey), @@ -254,24 +242,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * // multi-cluster mode, so for now it is turned-off: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/585 // cc.metricsAdapter = adapter.New(serverOption, config, cc.cache) - cc.genericresources = genericresource.NewAppWrapperGenericResource(config) - - // cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{} - // RegisterAllQueueJobResourceTypes(&cc.qjobRegisteredResources) + cc.genericresources = genericresource.NewAppWrapperGenericResource(restConfig) - // initialize pod sub-resource control - // resControlPod, found, err := cc.qjobRegisteredResources.InitQueueJobResource(arbv1.ResourceTypePod, config) - // if err != nil { - // klog.Errorf("fail to create queuejob resource control") - // return nil - // } - // if !found { - // klog.Errorf("queuejob resource type Pod not found") - // return nil - // } - // cc.qjobResControls[arbv1.ResourceTypePod] = resControlPod - - appWrapperClient, err := clientset.NewForConfig(cc.config) + appWrapperClient, err := clientset.NewForConfig(restConfig) if err != nil { klog.Fatalf("Could not instantiate k8s client, err=%v", err) } @@ -303,10 +276,10 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * cc.appWrapperSynced = cc.appwrapperInformer.Informer().HasSynced // Setup Quota - if serverOption.QuotaEnabled { - dispatchedAWDemands, dispatchedAWs := cc.getDispatchedAppWrappers() + if mcadConfig.IsQuotaEnabled() { + dispatchedAWDemands, dispatchedAWs := cc.getDispatchedAppWrappers(restConfig) cc.quotaManager, err = quotaforestmanager.NewQuotaManager(dispatchedAWDemands, dispatchedAWs, cc.appWrapperLister, - config, serverOption) + restConfig, mcadConfig) if err != nil { klog.Error("Failed to instantiate quota manager: %#v", err) return nil @@ -316,7 +289,7 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * } // Set dispatcher mode or agent mode - cc.isDispatcher = serverOption.Dispatcher + cc.isDispatcher = extConfig.IsDispatcher() if cc.isDispatcher { klog.Infof("[Controller] Dispatcher mode") } else { @@ -326,9 +299,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * // create agents and agentMap cc.agentMap = map[string]*queuejobdispatch.JobClusterAgent{} cc.agentList = []string{} - for _, agentconfig := range strings.Split(serverOption.AgentConfigs, ",") { - agentData := strings.Split(agentconfig, ":") - jobClusterAgent := queuejobdispatch.NewJobClusterAgent(agentconfig, cc.agentEventQueue) + for _, agentConfig := range extConfig.AgentConfigs { + agentData := strings.Split(agentConfig, ":") + jobClusterAgent := queuejobdispatch.NewJobClusterAgent(agentConfig, cc.agentEventQueue) if jobClusterAgent != nil { cc.agentMap["/root/kubernetes/"+agentData[0]] = jobClusterAgent cc.agentList = append(cc.agentList, "/root/kubernetes/"+agentData[0]) @@ -352,8 +325,6 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { ctx := context.Background() aw := qjm.GetQueueJobEligibleForPreemption(inspectAw) if aw != nil { - - //for _, aw := range qjobs { if aw.Status.State == arbv1.AppWrapperStateCompleted || aw.Status.State == arbv1.AppWrapperStateDeleted || aw.Status.State == arbv1.AppWrapperStateFailed { return } @@ -365,7 +336,7 @@ func (qjm *XController) PreemptQueueJobs(inspectAw *arbv1.AppWrapper) { klog.Warningf("[PreemptQueueJobs] failed in retrieving a fresh copy of the app wrapper '%s/%s', err=%v. Will try to preempt on the next run.", aw.Namespace, aw.Name, err) return } - //we need to update AW before analyzing it as a candidate for preemption + // we need to update AW before analyzing it as a candidate for preemption updateErr := qjm.UpdateQueueJobStatus(newjob) if updateErr != nil { klog.Warningf("[PreemptQueueJobs] update of pod count to AW %v failed hence skipping preemption", newjob.Name) @@ -726,11 +697,11 @@ func (qjm *XController) getProposedPreemptions(requestingJob *arbv1.AppWrapper, return proposedPreemptions } -func (qjm *XController) getDispatchedAppWrappers() (map[string]*clusterstateapi.Resource, map[string]*arbv1.AppWrapper) { +func (qjm *XController) getDispatchedAppWrappers(restConfig *rest.Config) (map[string]*clusterstateapi.Resource, map[string]*arbv1.AppWrapper) { awrRetVal := make(map[string]*clusterstateapi.Resource) awsRetVal := make(map[string]*arbv1.AppWrapper) // Setup and break down an informer to get a list of appwrappers bofore controllerinitialization completes - appWrapperClient, err := clientset.NewForConfig(qjm.config) + appWrapperClient, err := clientset.NewForConfig(restConfig) if err != nil { klog.Errorf("[getDispatchedAppWrappers] Failure creating client for initialization informer err=%#v", err) return awrRetVal, awsRetVal @@ -923,7 +894,7 @@ func (qjm *XController) chooseAgent(ctx context.Context, qj *arbv1.AppWrapper) s klog.V(2).Infof("[chooseAgent] Agent %s has enough resources\n", agentId) // Now evaluate quota - if qjm.serverOption.QuotaEnabled { + if qjm.config.IsQuotaEnabled() { if qjm.quotaManager != nil { if fits, preemptAWs, _ := qjm.quotaManager.Fits(qj, qjAggrResources, nil, proposedPreemptions); fits { klog.V(2).Infof("[chooseAgent] AppWrapper %s has enough quota.\n", qj.Name) @@ -1031,7 +1002,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { } // Re-compute SystemPriority for DynamicPriority policy - if qjm.serverOption.DynamicPriority { + if qjm.config.HasDynamicPriority() { klog.V(4).Info("[ScheduleNext] dynamic priority enabled") // Create newHeap to temporarily store qjqueue jobs for updating SystemPriority tempQ := newHeap(cache.MetaNamespaceKeyFunc, HigherSystemPriorityQJ) @@ -1160,11 +1131,11 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.V(4).Infof("[ScheduleNext] [Agent Mode] Forwarding loop iteration: %d", fowardingLoopCount) priorityindex := qj.Status.SystemPriority // Support for Non-Preemption - if !qjm.serverOption.Preemption { + if !qjm.config.HasPreemption() { priorityindex = -math.MaxFloat64 } // Disable Preemption under DynamicPriority. Comment out if allow DynamicPriority and Preemption at the same time. - if qjm.serverOption.DynamicPriority { + if qjm.config.HasDynamicPriority() { priorityindex = -math.MaxFloat64 } // cache now is a method inside the controller. @@ -1185,7 +1156,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { // Cluster resources need to be considered to determine if both quota and resources (after deleting borrowing AppWrappers) are availabe for the new AppWrapper // We perform a "quota check" first followed by a "resource check" fits := true - if qjm.serverOption.QuotaEnabled { + if qjm.config.IsQuotaEnabled() { if qjm.quotaManager != nil { // Quota tree design: // - All AppWrappers without quota submission will consume quota from the 'default' node. @@ -1472,12 +1443,12 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason } qjm.qjqueue.AddUnschedulableIfNotPresent(q) klog.V(3).Infof("[backoff] %s move to unschedulableQ before sleep for %d seconds. activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", q.Name, - qjm.serverOption.BackoffTime, qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q, q.ResourceVersion, q.Status) - time.Sleep(time.Duration(qjm.serverOption.BackoffTime) * time.Second) + qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q, q.ResourceVersion, q.Status) + time.Sleep(time.Duration(qjm.config.BackoffTimeOrDefault(defaultBackoffTime)) * time.Second) qjm.qjqueue.MoveToActiveQueueIfExists(q) klog.V(3).Infof("[backoff] '%s/%s' activeQ Add after sleep for %d seconds. activeQ=%t Unsched=%t &aw=%p Version=%s Status=%+v", q.Namespace, q.Name, - qjm.serverOption.BackoffTime, qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q, q.ResourceVersion, q.Status) + qjm.config.BackoffTimeOrDefault(defaultBackoffTime), qjm.qjqueue.IfExistActiveQ(q), qjm.qjqueue.IfExistUnschedulableQ(q), q, q.ResourceVersion, q.Status) } // Run starts AppWrapper Controller @@ -1514,7 +1485,7 @@ func (qjm *XController) UpdateQueueJobs(newjob *arbv1.AppWrapper) { err := qjm.UpdateQueueJobStatus(newjob) if err != nil { klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err) - //TODO: should we really return? + // TODO: should we really return? return } klog.V(6).Infof("[UpdateQueueJobs] %s: qjqueue=%t &qj=%p Version=%s Status=%+v", newjob.Name, qjm.qjqueue.IfExist(newjob), newjob, newjob.ResourceVersion, newjob.Status) @@ -1615,22 +1586,22 @@ func (cc *XController) addQueueJob(obj interface{}) { klog.V(6).Infof("[Informer-addQJ] enqueue %s &qj=%p Version=%s Status=%+v", qj.Name, qj, qj.ResourceVersion, qj.Status) // Requeue the item to be processed again in 30 seconds. - //TODO: tune the frequency of reprocessing an AW + // TODO: tune the frequency of reprocessing an AW hasCompletionStatus := false for _, genericItem := range qj.Spec.AggrResources.GenericItems { if len(genericItem.CompletionStatus) > 0 { hasCompletionStatus = true } } - //When an AW entrs a system with completionstatus keep checking the AW until completed - //updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate - //on stale AWs. This has potential to improve performance at scale. + // When an AW entrs a system with completionstatus keep checking the AW until completed + // updatequeuejobs now runs as a part of informer machinery. optimization here is to not use etcd to pullout submitted AWs and operate + // on stale AWs. This has potential to improve performance at scale. if hasCompletionStatus { requeueInterval := 5 * time.Second key, err := cache.MetaNamespaceKeyFunc(qj) if err != nil { klog.Warningf("[Informer-addQJ] Error getting AW %s from cache cannot determine completion status", qj.Name) - //TODO: should we return from this loop? + // TODO: should we return from this loop? } go func() { for { @@ -1647,7 +1618,7 @@ func (cc *XController) addQueueJob(obj interface{}) { } if latestAw.Status.State != arbv1.AppWrapperStateActive && latestAw.Status.State != arbv1.AppWrapperStateEnqueued && latestAw.Status.State != arbv1.AppWrapperStateRunningHoldCompletion { klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.Name, latestAw.Status.State) - break //Exit the loop + break // Exit the loop } // Enqueue the latest copy of the AW. if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && hasCompletionStatus { @@ -1666,7 +1637,7 @@ func (cc *XController) addQueueJob(obj interface{}) { key, err := cache.MetaNamespaceKeyFunc(qj) if err != nil { klog.Errorf("[Informer-addQJ] Error getting AW %s from cache cannot preempt AW", qj.Name) - //TODO: should we return from this loop? + // TODO: should we return from this loop? } go func() { for { @@ -1683,7 +1654,7 @@ func (cc *XController) addQueueJob(obj interface{}) { } if latestAw.Status.State != arbv1.AppWrapperStateActive && latestAw.Status.State != arbv1.AppWrapperStateEnqueued && latestAw.Status.State != arbv1.AppWrapperStateRunningHoldCompletion { klog.V(2).Infof("[Informer-addQJ] Stopping requeue for AW %s with status %s", latestAw.Name, latestAw.Status.State) - break //Exit the loop + break // Exit the loop } // Enqueue the latest copy of the AW. if (qj.Status.State != arbv1.AppWrapperStateCompleted && qj.Status.State != arbv1.AppWrapperStateFailed) && (qj.Spec.SchedSpec.MinAvailable > 0) { @@ -1723,10 +1694,10 @@ func (cc *XController) updateQueueJob(oldObj, newObj interface{}) { notBackedoff := true for _, cond := range newQJ.Status.Conditions { if cond.Type == arbv1.AppWrapperCondBackoff { - //AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue. - //TODO: we could plug an interface here with back-off strategies for different MCAD use cases. - time.AfterFunc(time.Duration(cc.serverOption.BackoffTime)*time.Second, func() { - if cc.serverOption.QuotaEnabled && cc.quotaManager != nil { + // AWs that have backoff conditions have a delay of 10 seconds before getting added to enqueue. + // TODO: we could plug an interface here with back-off strategies for different MCAD use cases. + time.AfterFunc(time.Duration(cc.config.BackoffTimeOrDefault(defaultBackoffTime))*time.Second, func() { + if cc.config.IsQuotaEnabled() && cc.quotaManager != nil { cc.quotaManager.Release(newQJ) } cc.enqueue(newQJ) @@ -1766,7 +1737,7 @@ func (cc *XController) deleteQueueJob(obj interface{}) { return } // we delete the job from the queue if it is there, ignoring errors - if cc.serverOption.QuotaEnabled && cc.quotaManager != nil { + if cc.config.IsQuotaEnabled() && cc.quotaManager != nil { cc.quotaManager.Release(qj) } cc.qjqueue.Delete(qj) @@ -2245,7 +2216,7 @@ func (cc *XController) Cleanup(ctx context.Context, appwrapper *arbv1.AppWrapper } // Release quota if quota is enabled and quota manager instance exists - if cc.serverOption.QuotaEnabled && cc.quotaManager != nil { + if cc.config.IsQuotaEnabled() && cc.quotaManager != nil { cc.quotaManager.Release(appwrapper) } appwrapper.Status.Pending = 0 diff --git a/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr.go b/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr.go index e662911f8..411b6b6d1 100644 --- a/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr.go +++ b/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr.go @@ -20,24 +20,23 @@ package quotaforestmanager import ( "bytes" "fmt" + "math" + "reflect" "strings" "github.com/hashicorp/go-multierror" - "github.com/project-codeflare/multi-cluster-app-dispatcher/cmd/kar-controllers/app/options" arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" listersv1beta1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/listers/controller/v1beta1" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/config" clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/genericresource" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota" qstmanager "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr/quotasubtmgr" qmbackend "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota" qmbackendutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota/utils" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util" - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/genericresource" - "k8s.io/client-go/rest" - - "math" - "reflect" + "k8s.io/client-go/rest" "k8s.io/klog/v2" ) @@ -109,20 +108,19 @@ func getDispatchedAppWrapper(dispatchedAWs map[string]*arbv1.AppWrapper, awId st } func NewQuotaManager(dispatchedAWDemands map[string]*clusterstateapi.Resource, dispatchedAWs map[string]*arbv1.AppWrapper, - awJobLister listersv1beta1.AppWrapperLister, config *rest.Config, serverOptions *options.ServerOption) (*QuotaManager, error) { + awJobLister listersv1beta1.AppWrapperLister, restConfig *rest.Config, mcadConfig *config.MCADConfiguration) (*QuotaManager, error) { var err error - if !serverOptions.QuotaEnabled { + if !mcadConfig.IsQuotaEnabled() { klog. Infof("[NewQuotaManager] Quota management is not enabled.") return nil, nil } qm := &QuotaManager{ - url: serverOptions.QuotaRestURL, appwrapperLister: awJobLister, - preemptionEnabled: serverOptions.Preemption, + preemptionEnabled: mcadConfig.HasPreemption(), quotaManagerBackend: qmbackend.NewManager(), initializationDone: false, } @@ -132,7 +130,7 @@ func NewQuotaManager(dispatchedAWDemands map[string]*clusterstateapi.Resource, d klog.V(4).Infof("[NewQuotaManager] Before initialization QuotaSubtree informer - %s", qm.quotaManagerBackend.String()) // Create a resource plan manager - qm.quotaSubtreeManager, err = qstmanager.NewQuotaSubtreeManager(config, qm.quotaManagerBackend) + qm.quotaSubtreeManager, err = qstmanager.NewQuotaSubtreeManager(restConfig, qm.quotaManagerBackend) if err != nil { klog.Errorf("[NewQuotaManager] failed to instantiate new quota subtree manager, err=%#v", err) return nil, err