diff --git a/pkg/cmd/server/kubernetes/master/controller/config.go b/pkg/cmd/server/kubernetes/master/controller/config.go index 0e330bad483b..a2096ad10d2a 100644 --- a/pkg/cmd/server/kubernetes/master/controller/config.go +++ b/pkg/cmd/server/kubernetes/master/controller/config.go @@ -1,19 +1,9 @@ package controller import ( - "fmt" - "io/ioutil" - "os" - - "k8s.io/apimachinery/pkg/runtime" - kerrors "k8s.io/apimachinery/pkg/util/errors" kubecontroller "k8s.io/kubernetes/cmd/kube-controller-manager/app" - scheduleroptions "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" - schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api" - latestschedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api/latest" configapi "github.com/openshift/origin/pkg/cmd/server/api" - cmdflags "github.com/openshift/origin/pkg/cmd/util/flags" "github.com/openshift/origin/pkg/cmd/util/variable" "k8s.io/kubernetes/pkg/volume" ) @@ -22,9 +12,6 @@ import ( // launch the set of kube (not openshift) controllers. type KubeControllerConfig struct { HorizontalPodAutoscalerControllerConfig HorizontalPodAutoscalerControllerConfig - - // TODO the scheduler should move out into its own logical component - SchedulerControllerConfig SchedulerControllerConfig } // GetControllerInitializers return the controller initializer functions for kube controllers @@ -36,48 +23,14 @@ func (c KubeControllerConfig) GetControllerInitializers() (map[string]kubecontro // in openshift-infra, and pass it a scale client that knows how to scale DCs ret["horizontalpodautoscaling"] = c.HorizontalPodAutoscalerControllerConfig.RunController - // FIXME: Move this under openshift controller intialization once we figure out - // deployment (options). - ret["openshift.io/scheduler"] = c.SchedulerControllerConfig.RunController - return ret, nil } // BuildKubeControllerConfig builds the struct to create the controller initializers. Eventually we want this to be fully // stock kube with no modification. func BuildKubeControllerConfig(options configapi.MasterConfig) (*KubeControllerConfig, error) { - var err error ret := &KubeControllerConfig{} - kubeExternal, _, err := configapi.GetExternalKubeClient(options.MasterClients.OpenShiftLoopbackKubeConfig, options.MasterClients.OpenShiftLoopbackClientConnectionOverrides) - if err != nil { - return nil, err - } - - var schedulerPolicy *schedulerapi.Policy - if _, err := os.Stat(options.KubernetesMasterConfig.SchedulerConfigFile); err == nil { - schedulerPolicy = &schedulerapi.Policy{} - configData, err := ioutil.ReadFile(options.KubernetesMasterConfig.SchedulerConfigFile) - if err != nil { - return nil, fmt.Errorf("unable to read scheduler config: %v", err) - } - if err := runtime.DecodeInto(latestschedulerapi.Codec, configData, schedulerPolicy); err != nil { - return nil, fmt.Errorf("invalid scheduler configuration: %v", err) - } - } - // resolve extended arguments - // TODO: this should be done in config validation (along with the above) so we can provide - // proper errors - schedulerserver := scheduleroptions.NewSchedulerServer() - schedulerserver.PolicyConfigFile = options.KubernetesMasterConfig.SchedulerConfigFile - if err := cmdflags.Resolve(options.KubernetesMasterConfig.SchedulerArguments, schedulerserver.AddFlags); len(err) > 0 { - return nil, kerrors.NewAggregate(err) - } - ret.SchedulerControllerConfig = SchedulerControllerConfig{ - PrivilegedClient: kubeExternal, - SchedulerServer: schedulerserver, - } - imageTemplate := variable.NewDefaultImageTemplate() imageTemplate.Format = options.ImageConfig.Format imageTemplate.Latest = options.ImageConfig.Latest diff --git a/pkg/cmd/server/kubernetes/master/controller/core.go b/pkg/cmd/server/kubernetes/master/controller/core.go deleted file mode 100644 index 975daa1b3a8d..000000000000 --- a/pkg/cmd/server/kubernetes/master/controller/core.go +++ /dev/null @@ -1,47 +0,0 @@ -package controller - -import ( - "fmt" - - kv1core "k8s.io/client-go/kubernetes/typed/core/v1" - kclientv1 "k8s.io/client-go/pkg/api/v1" - "k8s.io/client-go/tools/record" - kubecontroller "k8s.io/kubernetes/cmd/kube-controller-manager/app" - kapi "k8s.io/kubernetes/pkg/api" - kclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - schedulerapp "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" - scheduleroptions "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" - _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" -) - -type SchedulerControllerConfig struct { - // TODO: Move this closer to upstream, we want unprivileged client here. - PrivilegedClient kclientset.Interface - SchedulerServer *scheduleroptions.SchedulerServer -} - -func (c *SchedulerControllerConfig) RunController(ctx kubecontroller.ControllerContext) (bool, error) { - eventcast := record.NewBroadcaster() - recorder := eventcast.NewRecorder(kapi.Scheme, kclientv1.EventSource{Component: kapi.DefaultSchedulerName}) - eventcast.StartRecordingToSink(&kv1core.EventSinkImpl{Interface: kv1core.New(c.PrivilegedClient.CoreV1().RESTClient()).Events("")}) - - s, err := schedulerapp.CreateScheduler(c.SchedulerServer, - c.PrivilegedClient, - ctx.InformerFactory.Core().V1().Nodes(), - ctx.InformerFactory.Core().V1().Pods(), - ctx.InformerFactory.Core().V1().PersistentVolumes(), - ctx.InformerFactory.Core().V1().PersistentVolumeClaims(), - ctx.InformerFactory.Core().V1().ReplicationControllers(), - ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(), - ctx.InformerFactory.Apps().V1beta1().StatefulSets(), - ctx.InformerFactory.Core().V1().Services(), - recorder, - ) - if err != nil { - return false, fmt.Errorf("error creating scheduler: %v", err) - } - - go s.Run() - - return true, nil -} diff --git a/pkg/cmd/server/start/start_master.go b/pkg/cmd/server/start/start_master.go index 6674a21ae8e0..c70d67100c4d 100644 --- a/pkg/cmd/server/start/start_master.go +++ b/pkg/cmd/server/start/start_master.go @@ -472,6 +472,9 @@ func (m *Master) Start() error { go func() { controllerPlug.WaitForStart() + // continuously run the scheduler while we have the primary lease + go runEmbeddedScheduler(m.config.MasterClients.OpenShiftLoopbackKubeConfig, m.config.KubernetesMasterConfig.SchedulerConfigFile, m.config.KubernetesMasterConfig.SchedulerArguments) + controllerContext, err := getControllerContext(*m.config, kubeControllerManagerConfig, cloudProvider, informers, utilwait.NeverStop) if err != nil { glog.Fatal(err) diff --git a/pkg/cmd/server/start/start_scheduler.go b/pkg/cmd/server/start/start_scheduler.go new file mode 100644 index 000000000000..5d42853d67bf --- /dev/null +++ b/pkg/cmd/server/start/start_scheduler.go @@ -0,0 +1,53 @@ +package start + +import ( + "github.com/golang/glog" + + kerrors "k8s.io/apimachinery/pkg/util/errors" + schedulerapp "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app" + scheduleroptions "k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/options" + _ "k8s.io/kubernetes/plugin/pkg/scheduler/algorithmprovider" + + cmdflags "github.com/openshift/origin/pkg/cmd/util/flags" +) + +func newScheduler(kubeconfigFile, schedulerConfigFile string, cmdLineArgs map[string][]string) (*scheduleroptions.SchedulerServer, error) { + if cmdLineArgs == nil { + cmdLineArgs = map[string][]string{} + } + if len(cmdLineArgs["kubeconfig"]) == 0 { + cmdLineArgs["kubeconfig"] = []string{kubeconfigFile} + } + if len(cmdLineArgs["policy-config-file"]) == 0 { + cmdLineArgs["policy-config-file"] = []string{schedulerConfigFile} + } + // disable serving http since we didn't used to expose it + if len(cmdLineArgs["port"]) == 0 { + cmdLineArgs["port"] = []string{"-1"} + } + + // resolve arguments + schedulerServer := scheduleroptions.NewSchedulerServer() + if err := cmdflags.Resolve(cmdLineArgs, schedulerServer.AddFlags); len(err) > 0 { + return nil, kerrors.NewAggregate(err) + } + + return schedulerServer, nil +} + +func runEmbeddedScheduler(kubeconfigFile, schedulerConfigFile string, cmdLineArgs map[string][]string) { + for { + // TODO we need a real identity for this. Right now it's just using the loopback connection like it used to. + scheduler, err := newScheduler(kubeconfigFile, schedulerConfigFile, cmdLineArgs) + if err != nil { + glog.Error(err) + continue + } + // this does a second leader election, but doing the second leader election will allow us to move out process in + // 3.8 if we so choose. + if err := schedulerapp.Run(scheduler); err != nil { + glog.Error(err) + continue + } + } +} diff --git a/vendor/k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/BUILD b/vendor/k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/BUILD index 72a3c6b35b07..55ce194b94e7 100644 --- a/vendor/k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/BUILD +++ b/vendor/k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/BUILD @@ -38,6 +38,7 @@ go_library( "//vendor/github.com/spf13/pflag:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apiserver/pkg/server/healthz:go_default_library", "//vendor/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library", "//vendor/k8s.io/client-go/pkg/api/v1:go_default_library", diff --git a/vendor/k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/server.go b/vendor/k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/server.go index e5918600ad5f..e987c656dfa7 100644 --- a/vendor/k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/server.go +++ b/vendor/k8s.io/kubernetes/plugin/cmd/kube-scheduler/app/server.go @@ -28,6 +28,7 @@ import ( "k8s.io/apiserver/pkg/server/healthz" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions" "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/leaderelection/resourcelock" @@ -93,7 +94,9 @@ func Run(s *options.SchedulerServer) error { return fmt.Errorf("error creating scheduler: %v", err) } - go startHTTP(s) + if s.Port != -1 { + go startHTTP(s) + } stop := make(chan struct{}) defer close(stop) @@ -103,14 +106,14 @@ func Run(s *options.SchedulerServer) error { informerFactory.WaitForCacheSync(stop) controller.WaitForCacheSync("scheduler", stop, podInformer.Informer().HasSynced) - run := func(_ <-chan struct{}) { + run := func(stopCh <-chan struct{}) { sched.Run() - select {} + <-stopCh } if !s.LeaderElection.LeaderElect { - run(nil) - panic("unreachable") + run(stop) + return fmt.Errorf("finished without leader elect") } id, err := os.Hostname() @@ -127,23 +130,29 @@ func Run(s *options.SchedulerServer) error { EventRecorder: recorder, }) if err != nil { - glog.Fatalf("error creating lock: %v", err) + return fmt.Errorf("error creating lock: %v", err) } - leaderelection.RunOrDie(leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: s.LeaderElection.LeaseDuration.Duration, - RenewDeadline: s.LeaderElection.RenewDeadline.Duration, - RetryPeriod: s.LeaderElection.RetryPeriod.Duration, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: run, - OnStoppedLeading: func() { - glog.Fatalf("lost master") + leaderElector, err := leaderelection.NewLeaderElector( + leaderelection.LeaderElectionConfig{ + Lock: rl, + LeaseDuration: s.LeaderElection.LeaseDuration.Duration, + RenewDeadline: s.LeaderElection.RenewDeadline.Duration, + RetryPeriod: s.LeaderElection.RetryPeriod.Duration, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: run, + OnStoppedLeading: func() { + utilruntime.HandleError(fmt.Errorf("lost master")) + }, }, - }, - }) + }) + if err != nil { + return err + } + + leaderElector.Run() - panic("unreachable") + return fmt.Errorf("lost lease") } func startHTTP(s *options.SchedulerServer) {