Skip to content

Use leader election library in csi-lib-utils #31

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 2, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
179 changes: 86 additions & 93 deletions Gopkg.lock

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@

[[constraint]]
name = "k8s.io/api"
version = "kubernetes-1.14.0-alpha.1"
version = "kubernetes-1.14.0"

[[constraint]]
name = "k8s.io/apimachinery"
version = "kubernetes-1.14.0-alpha.1"
version = "kubernetes-1.14.0"

[[constraint]]
name = "k8s.io/client-go"
version = "kubernetes-1.14.0-alpha.1"
version = "kubernetes-1.14.0"

[[constraint]]
name = "k8s.io/klog"
Expand All @@ -46,7 +46,7 @@

[[constraint]]
name = "github.com/kubernetes-csi/csi-lib-utils"
version = ">=0.4.0-rc1"
version = ">=0.6.1"

[prune]
non-go = true
Expand Down
56 changes: 22 additions & 34 deletions cmd/csi-resizer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@ limitations under the License.
package main

import (
"context"
"flag"
"fmt"
"os"
"time"

"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
"github.com/kubernetes-csi/external-resizer/pkg/controller"
"github.com/kubernetes-csi/external-resizer/pkg/resizer"
"github.com/kubernetes-csi/external-resizer/pkg/util"
Expand All @@ -41,24 +43,9 @@ var (
csiTimeout = flag.Duration("csiTimeout", 15*time.Second, "Timeout for waiting for CSI driver socket.")
showVersion = flag.Bool("version", false, "Show version")

enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.")
leaderElectionIdentity = flag.String("leader-election-identity", "", "Unique identity of this resizer. Typically name of the pod where the resizer runs.")
leaderElectionNamespace = flag.String("leader-election-namespace", "kube-system", "Namespace where this resizer runs.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", time.Second*5,
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", time.Second*15,
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
leaderElectionRenewDeadLine = flag.Duration("leader-election-renew-deadline", time.Second*10,
"The duration that non-leader candidates will wait after observing a leadership "+
"renewal until attempting to acquire leadership of a led but unrenewed leader "+
"slot. This is effectively the maximum duration that a leader can be stopped "+
"before it is replaced by another candidate. This is only applicable if leader "+
"election is enabled.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable leader election.")
leaderElectionNamespace = flag.String("leader-election-namespace", "", "Namespace where the leader election resource lives. Defaults to the pod namespace if not set.")

version = "unknown"
)

Expand Down Expand Up @@ -86,24 +73,25 @@ func main() {
}

resizerName := csiResizer.Name()
rc := controller.NewResizeController(resizerName, csiResizer, kubeClient, *resyncPeriod, informerFactory)
run := func(ctx context.Context) {
informerFactory.Start(wait.NeverStop)
rc.Run(*workers, ctx)

var leaderElectionConfig *util.LeaderElectionConfig
if *enableLeaderElection {
if leaderElectionIdentity == nil || *leaderElectionIdentity == "" {
klog.Fatal("--leader-election-identity must not be empty")
}
leaderElectionConfig = &util.LeaderElectionConfig{
Identity: *leaderElectionIdentity,
LockName: "external-resizer-" + util.SanitizeName(resizerName),
Namespace: *leaderElectionNamespace,
RetryPeriod: *leaderElectionRetryPeriod,
LeaseDuration: *leaderElectionLeaseDuration,
RenewDeadLine: *leaderElectionRenewDeadLine,
}
}

rc := controller.NewResizeController(resizerName, csiResizer, kubeClient, *resyncPeriod, informerFactory)
if !*enableLeaderElection {
run(context.TODO())
} else {
lockName := "external-resizer-" + util.SanitizeName(resizerName)
le := leaderelection.NewLeaderElection(kubeClient, lockName, run)

informerFactory.Start(wait.NeverStop)
rc.Run(*workers, leaderElectionConfig)
if *leaderElectionNamespace != "" {
le.WithNamespace(*leaderElectionNamespace)
}

if err := le.Run(); err != nil {
klog.Fatalf("error initializing leader election: %v", err)
}
}
}
6 changes: 0 additions & 6 deletions deploy/kubernetes/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,11 @@ spec:
- "--v=5"
- "--csi-address=$(ADDRESS)"
- "--leader-election"
- "--leader-election-namespace=$(MY_NAMESPACE)"
- "--leader-election-identity=$(MY_NAME)"
env:
- name: MY_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: MY_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/mock.socket
imagePullPolicy: "IfNotPresent"
Expand Down
3 changes: 3 additions & 0 deletions deploy/kubernetes/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ rules:
- apiGroups: [""]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops can we remove the endpoints rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'll follow-up

resources: ["endpoints"]
verbs: ["get", "watch", "list", "delete", "update", "create"]
- apiGroups: ["coordination.k8s.io"]
resources: ["leases"]
verbs: ["get", "watch", "list", "delete", "update", "create"]

---
kind: RoleBinding
Expand Down
40 changes: 13 additions & 27 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import (
// If requested, it will resize according PVs and update PVCs' status to reflect the new size.
type ResizeController interface {
// Run starts the controller.
Run(workers int, leaderElectionConfig *util.LeaderElectionConfig)
Run(workers int, ctx context.Context)
}

type resizeController struct {
Expand Down Expand Up @@ -154,38 +154,24 @@ func getPVCKey(obj interface{}) (string, error) {

// Run starts the controller.
func (ctrl *resizeController) Run(
workers int,
leaderElectionConfig *util.LeaderElectionConfig) {
run := func(ctx context.Context) {
defer ctrl.claimQueue.ShutDown()
workers int, ctx context.Context) {
defer ctrl.claimQueue.ShutDown()

klog.Infof("Starting external resizer %s", ctrl.name)
defer klog.Infof("Shutting down external resizer %s", ctrl.name)
klog.Infof("Starting external resizer %s", ctrl.name)
defer klog.Infof("Shutting down external resizer %s", ctrl.name)

stopCh := ctx.Done()
stopCh := ctx.Done()

if !cache.WaitForCacheSync(stopCh, ctrl.pvSynced, ctrl.pvcSynced) {
klog.Errorf("Cannot sync pv/pvc caches")
return
}

for i := 0; i < workers; i++ {
go wait.Until(ctrl.syncPVCs, 0, stopCh)
}

<-stopCh
if !cache.WaitForCacheSync(stopCh, ctrl.pvSynced, ctrl.pvcSynced) {
klog.Errorf("Cannot sync pv/pvc caches")
return
}

if leaderElectionConfig == nil {
// Leader election disabled.
run(context.TODO())
} else {
lock, err := util.NewLeaderLock(ctrl.kubeClient, ctrl.eventRecorder, leaderElectionConfig)
if err != nil {
klog.Fatalf("Error creating leader election lock: %v", err)
}
util.RunAsLeader(lock, leaderElectionConfig, run)
for i := 0; i < workers; i++ {
go wait.Until(ctrl.syncPVCs, 0, stopCh)
}

<-stopCh
}

// syncPVCs is the main worker.
Expand Down
75 changes: 0 additions & 75 deletions pkg/util/leaderelection.go

This file was deleted.

Loading