7
7
import org .slf4j .Logger ;
8
8
import org .slf4j .LoggerFactory ;
9
9
10
- import io .fabric8 .kubernetes .api .model .authorization .v1 .*;
10
+ import io .fabric8 .kubernetes .api .model .authorization .v1 .SelfSubjectRulesReview ;
11
+ import io .fabric8 .kubernetes .api .model .authorization .v1 .SelfSubjectRulesReviewSpecBuilder ;
11
12
import io .fabric8 .kubernetes .client .KubernetesClient ;
12
13
import io .fabric8 .kubernetes .client .extended .leaderelection .LeaderCallbacks ;
13
14
import io .fabric8 .kubernetes .client .extended .leaderelection .LeaderElectionConfig ;
14
15
import io .fabric8 .kubernetes .client .extended .leaderelection .LeaderElector ;
15
16
import io .fabric8 .kubernetes .client .extended .leaderelection .LeaderElectorBuilder ;
16
17
import io .fabric8 .kubernetes .client .extended .leaderelection .resourcelock .LeaseLock ;
17
- import io .javaoperatorsdk .operator .api .config .ConfigurationServiceProvider ;
18
- import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
18
+ import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
19
19
import io .javaoperatorsdk .operator .api .config .LeaderElectionConfiguration ;
20
20
21
21
public class LeaderElectionManager {
@@ -24,51 +24,58 @@ public class LeaderElectionManager {
24
24
25
25
public static final String NO_PERMISSION_TO_LEASE_RESOURCE_MESSAGE =
26
26
"No permission to lease resource." ;
27
+ public static final String UNIVERSAL_VERB = "*" ;
28
+ public static final String COORDINATION_GROUP = "coordination.k8s.io" ;
29
+ public static final String LEASES_RESOURCE = "leases" ;
27
30
28
31
private LeaderElector leaderElector = null ;
29
32
private final ControllerManager controllerManager ;
30
33
private String identity ;
31
34
private CompletableFuture <?> leaderElectionFuture ;
32
- private KubernetesClient client ;
33
- private String leaseName ;
35
+ private KubernetesClient kubernetesClient ;
36
+ private final ConfigurationService configurationService ;
34
37
private String leaseNamespace ;
35
38
36
- public LeaderElectionManager (ControllerManager controllerManager ) {
39
+ public LeaderElectionManager (KubernetesClient kubernetesClient ,
40
+ ControllerManager controllerManager ,
41
+ ConfigurationService configurationService ) {
42
+ this .kubernetesClient = kubernetesClient ;
37
43
this .controllerManager = controllerManager ;
44
+ this .configurationService = configurationService ;
38
45
}
39
46
40
- public void init (LeaderElectionConfiguration config , KubernetesClient client ) {
41
- this .client = client ;
47
+ public boolean isLeaderElectionEnabled () {
48
+ return configurationService .getLeaderElectionConfiguration ().isPresent ();
49
+ }
50
+
51
+ private void init (LeaderElectionConfiguration config ) {
42
52
this .identity = identity (config );
43
- this .leaseName = config .getLeaseName ();
44
53
leaseNamespace =
45
54
config .getLeaseNamespace ().orElseGet (
46
- () -> ConfigurationServiceProvider . instance () .getClientConfiguration ().getNamespace ());
55
+ () -> configurationService .getClientConfiguration ().getNamespace ());
47
56
if (leaseNamespace == null ) {
48
57
final var message =
49
58
"Lease namespace is not set and cannot be inferred. Leader election cannot continue." ;
50
59
log .error (message );
51
60
throw new IllegalArgumentException (message );
52
61
}
53
- final var lock = new LeaseLock (leaseNamespace , leaseName , identity );
62
+ final var lock = new LeaseLock (leaseNamespace , config . getLeaseName () , identity );
54
63
// releaseOnCancel is not used in the underlying implementation
55
- leaderElector =
56
- new LeaderElectorBuilder (
57
- client , ExecutorServiceManager . instance (). executorService ())
58
- . withConfig ( new LeaderElectionConfig (
64
+ leaderElector = new LeaderElectorBuilder (
65
+ kubernetesClient , configurationService . getExecutorServiceManager (). cachingExecutorService ())
66
+ . withConfig (
67
+ new LeaderElectionConfig (
59
68
lock ,
60
69
config .getLeaseDuration (),
61
70
config .getRenewDeadline (),
62
71
config .getRetryPeriod (),
63
72
leaderCallbacks (),
64
73
true ,
65
74
config .getLeaseName ()))
66
- .build ();
75
+ .build ();
67
76
}
68
77
69
- public boolean isLeaderElectionEnabled () {
70
- return leaderElector != null ;
71
- }
78
+
72
79
73
80
private LeaderCallbacks leaderCallbacks () {
74
81
return new LeaderCallbacks (
@@ -99,6 +106,7 @@ private String identity(LeaderElectionConfiguration config) {
99
106
100
107
public void start () {
101
108
if (isLeaderElectionEnabled ()) {
109
+ init (configurationService .getLeaderElectionConfiguration ().orElseThrow ());
102
110
checkLeaseAccess ();
103
111
leaderElectionFuture = leaderElector .start ();
104
112
}
@@ -114,12 +122,12 @@ private void checkLeaseAccess() {
114
122
var verbs = Arrays .asList ("create" , "update" , "get" );
115
123
SelfSubjectRulesReview review = new SelfSubjectRulesReview ();
116
124
review .setSpec (new SelfSubjectRulesReviewSpecBuilder ().withNamespace (leaseNamespace ).build ());
117
- var reviewResult = client .resource (review ).create ();
125
+ var reviewResult = kubernetesClient .resource (review ).create ();
118
126
log .debug ("SelfSubjectRulesReview result: {}" , reviewResult );
119
127
var foundRule = reviewResult .getStatus ().getResourceRules ().stream ()
120
- .filter (rule -> rule .getApiGroups ().contains ("coordination.k8s.io" )
121
- && rule .getResources ().contains ("leases" )
122
- && (rule .getVerbs ().containsAll (verbs )) || rule .getVerbs ().contains ("*" ))
128
+ .filter (rule -> rule .getApiGroups ().contains (COORDINATION_GROUP )
129
+ && rule .getResources ().contains (LEASES_RESOURCE )
130
+ && (rule .getVerbs ().containsAll (verbs )) || rule .getVerbs ().contains (UNIVERSAL_VERB ))
123
131
.findAny ();
124
132
if (foundRule .isEmpty ()) {
125
133
throw new OperatorException (NO_PERMISSION_TO_LEASE_RESOURCE_MESSAGE +
0 commit comments