1
1
package io .javaoperatorsdk .operator ;
2
2
3
+ import java .io .Closeable ;
3
4
import java .io .IOException ;
4
5
import java .net .ConnectException ;
5
6
import java .util .LinkedList ;
@@ -25,8 +26,7 @@ public class Operator implements AutoCloseable {
25
26
private static final Logger log = LoggerFactory .getLogger (Operator .class );
26
27
private final KubernetesClient k8sClient ;
27
28
private final ConfigurationService configurationService ;
28
- private final List <ConfiguredController > controllers = new LinkedList <>();
29
- private volatile boolean started = false ;
29
+ private final ControllerManager controllers = new ControllerManager ();
30
30
31
31
public Operator (KubernetesClient k8sClient , ConfigurationService configurationService ) {
32
32
this .k8sClient = k8sClient ;
@@ -62,14 +62,8 @@ public ConfigurationService getConfigurationService() {
62
62
* where there is no obvious entrypoint to the application which can trigger the injection process
63
63
* and start the cluster monitoring processes.
64
64
*/
65
- @ SuppressWarnings ("unchecked" )
66
65
public void start () {
67
- if (started ) {
68
- return ;
69
- }
70
- if (controllers .isEmpty ()) {
71
- throw new OperatorException ("No ResourceController exists. Exiting!" );
72
- }
66
+ controllers .shouldStart ();
73
67
74
68
final var version = configurationService .getVersion ();
75
69
log .info (
@@ -95,8 +89,7 @@ public void start() {
95
89
throw new OperatorException (error , e );
96
90
}
97
91
98
- controllers .parallelStream ().forEach (ConfiguredController ::start );
99
- started = true ;
92
+ controllers .start ();
100
93
}
101
94
102
95
/** Stop the operator. */
@@ -105,20 +98,7 @@ public void close() {
105
98
log .info (
106
99
"Operator SDK {} is shutting down..." , configurationService .getVersion ().getSdkVersion ());
107
100
108
- if (!started ) {
109
- return ;
110
- }
111
-
112
- this .controllers .parallelStream ().forEach (closeable -> {
113
- try {
114
- log .debug ("closing {}" , closeable );
115
- closeable .close ();
116
- } catch (IOException e ) {
117
- log .warn ("Error closing {}" , closeable , e );
118
- }
119
- });
120
-
121
- started = false ;
101
+ controllers .close ();
122
102
}
123
103
124
104
/**
@@ -164,10 +144,7 @@ public <R extends CustomResource> void register(
164
144
}
165
145
final var configuredController =
166
146
new ConfiguredController (controller , configuration , k8sClient );
167
- this .controllers .add (configuredController );
168
- if (started ) {
169
- configuredController .start ();
170
- }
147
+ controllers .add (configuredController );
171
148
172
149
final var watchedNS =
173
150
configuration .watchAllNamespaces ()
@@ -180,4 +157,49 @@ public <R extends CustomResource> void register(
180
157
watchedNS );
181
158
}
182
159
}
160
+
161
+ private static class ControllerManager implements Closeable {
162
+ private final List <ConfiguredController > controllers = new LinkedList <>();
163
+ private boolean started = false ;
164
+
165
+
166
+ public synchronized void shouldStart () {
167
+ if (started ) {
168
+ return ;
169
+ }
170
+ if (controllers .isEmpty ()) {
171
+ throw new OperatorException ("No ResourceController exists. Exiting!" );
172
+ }
173
+ }
174
+
175
+ public synchronized void start () {
176
+ controllers .parallelStream ().forEach (ConfiguredController ::start );
177
+ started = true ;
178
+ }
179
+
180
+ @ Override
181
+ public synchronized void close () {
182
+ if (!started ) {
183
+ return ;
184
+ }
185
+
186
+ this .controllers .parallelStream ().forEach (closeable -> {
187
+ try {
188
+ log .debug ("closing {}" , closeable );
189
+ closeable .close ();
190
+ } catch (IOException e ) {
191
+ log .warn ("Error closing {}" , closeable , e );
192
+ }
193
+ });
194
+
195
+ started = false ;
196
+ }
197
+
198
+ public synchronized void add (ConfiguredController configuredController ) {
199
+ this .controllers .add (configuredController );
200
+ if (started ) {
201
+ configuredController .start ();
202
+ }
203
+ }
204
+ }
183
205
}
0 commit comments