Skip to content

Commit 8e86a44

Browse files
committed
use csi-lib-utils for leader election, also add support for lease based leader election
Signed-off-by: Andrew Sy Kim <[email protected]>
1 parent 8431d2f commit 8e86a44

File tree

1 file changed

+55
-4
lines changed

1 file changed

+55
-4
lines changed

cmd/csi-provisioner/csi-provisioner.go

Lines changed: 55 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@ limitations under the License.
1717
package main
1818

1919
import (
20+
"context"
2021
goflag "flag"
2122
"fmt"
23+
"io/ioutil"
2224
"math/rand"
2325
"os"
2426
"strconv"
@@ -27,6 +29,7 @@ import (
2729

2830
flag "github.com/spf13/pflag"
2931

32+
"github.com/kubernetes-csi/csi-lib-utils/leaderelection"
3033
ctrl "github.com/kubernetes-csi/external-provisioner/pkg/controller"
3134
snapclientset "github.com/kubernetes-csi/external-snapshotter/pkg/client/clientset/versioned"
3235
"sigs.k8s.io/sig-storage-lib-external-provisioner/controller"
@@ -51,19 +54,25 @@ var (
5154
volumeNamePrefix = flag.String("volume-name-prefix", "pvc", "Prefix to apply to the name of a created volume.")
5255
volumeNameUUIDLength = flag.Int("volume-name-uuid-length", -1, "Truncates generated UUID of a created volume to this length. Defaults behavior is to NOT truncate.")
5356
showVersion = flag.Bool("version", false, "Show version.")
54-
enableLeaderElection = flag.Bool("enable-leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")
5557
retryIntervalStart = flag.Duration("retry-interval-start", time.Second, "Initial retry interval of failed provisioning or deletion. It doubles with each failure, up to retry-interval-max.")
5658
retryIntervalMax = flag.Duration("retry-interval-max", 5*time.Minute, "Maximum retry interval of failed provisioning or deletion.")
5759
workerThreads = flag.Uint("worker-threads", 100, "Number of provisioner worker threads, in other words nr. of simultaneous CSI calls.")
5860
operationTimeout = flag.Duration("timeout", 10*time.Second, "Timeout for waiting for creation or deletion of a volume")
5961
provisioner = flag.String("provisioner", "", "This option is deprecated")
6062

63+
enableLeaderElection = flag.Bool("enable-leader-election", false, "Enables leader election. If leader election is enabled, additional RBAC rules are required. Please refer to the Kubernetes CSI documentation for instructions on setting up these RBAC rules.")
64+
leaderElectionType = flag.String("leader-election-type", "endpoints", "the type of leader election, options are 'endpoints' (default) or 'lease' (strongly recommended)")
65+
6166
featureGates map[string]bool
6267
provisionController *controller.ProvisionController
6368
version = "unknown"
6469
)
6570

66-
func init() {
71+
type leaderElection interface {
72+
Run() error
73+
}
74+
75+
func main() {
6776
var config *rest.Config
6877
var err error
6978

@@ -184,8 +193,50 @@ func init() {
184193
serverVersion.GitVersion,
185194
provisionerOptions...,
186195
)
196+
197+
run := func(context.Context) {
198+
provisionController.Run(wait.NeverStop)
199+
}
200+
201+
if !*enableLeaderElection {
202+
run(context.TODO())
203+
} else {
204+
// getInClusterNamespace is copied from sigs.k8s.io/sig-storage-lib-external-provisioner/controller
205+
// to preserve backwards compatibilty
206+
namespace := getInClusterNamespace()
207+
// this lock name pattern is also copied from sigs.k8s.io/sig-storage-lib-external-provisioner/controller
208+
// to preserve backwards compatibility
209+
lockName := strings.Replace(provisionerName, "/", "-", -1)
210+
211+
var le leaderElection
212+
if *leaderElectionType == "endpoints" {
213+
le = leaderelection.NewLeaderElectionWithEndpoints(clientset, lockName, namespace, run)
214+
} else if *leaderElectionType == "lease" {
215+
le = leaderelection.NewLeaderElection(clientset, lockName, namespace, run)
216+
} else {
217+
klog.Error("--leader-election-type must be either 'endpoints' or 'lease'")
218+
os.Exit(1)
219+
}
220+
221+
if err := le.Run(); err != nil {
222+
klog.Fatalf("failed to initialize leader election: %v", err)
223+
}
224+
}
225+
187226
}
188227

189-
func main() {
190-
provisionController.Run(wait.NeverStop)
228+
// getInClusterNamespace returns the namespace in which the controller runs.
229+
func getInClusterNamespace() string {
230+
if ns := os.Getenv("POD_NAMESPACE"); ns != "" {
231+
return ns
232+
}
233+
234+
// Fall back to the namespace associated with the service account token, if available
235+
if data, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace"); err == nil {
236+
if ns := strings.TrimSpace(string(data)); len(ns) > 0 {
237+
return ns
238+
}
239+
}
240+
241+
return "default"
191242
}

0 commit comments

Comments
 (0)