Skip to content

Commit 3c82578

Browse files
10000-kimetacosm
authored andcommitted
feat: support for graceful shutdown based on configuration (#2479)
--------- Signed-off-by: 10000-ki <[email protected]>
1 parent 8614362 commit 3c82578

File tree

7 files changed

+72
-42
lines changed

7 files changed

+72
-42
lines changed

Diff for: docs/content/en/docs/patterns-and-best-practices/_index.md

+12
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,15 @@ might be a permission issue for some resources in another namespace.
120120
The `stopOnInformerErrorDuringStartup` has implication on [cache sync timeout](https://github.com/java-operator-sdk/java-operator-sdk/blob/114c4312c32b34688811df8dd7cea275878c9e73/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L177-L179)
121121
behavior. If true operator will stop on cache sync timeout. if `false`, after the timeout the controller will start
122122
reconcile resources even if one or more event source caches did not sync yet.
123+
124+
## Graceful Shutdown
125+
126+
You can provide sufficient time for the reconciler to process and complete the currently ongoing events before shutting down.
127+
The configuration is simple. You just need to set an appropriate duration value for `reconciliationTerminationTimeout` using `ConfigurationServiceOverrider`.
128+
129+
```java
130+
final var overridden = new ConfigurationServiceOverrider(config)
131+
.withReconciliationTerminationTimeout(Duration.ofSeconds(5));
132+
133+
final var operator = new Operator(overridden);
134+
```

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

+8-10
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ private static ConfigurationService initConfigurationService(KubernetesClient cl
100100
@SuppressWarnings("unused")
101101
public void installShutdownHook(Duration gracefulShutdownTimeout) {
102102
if (!leaderElectionManager.isLeaderElectionEnabled()) {
103-
Runtime.getRuntime().addShutdownHook(new Thread(() -> stop(gracefulShutdownTimeout)));
103+
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
104104
} else {
105105
log.warn("Leader election is on, shutdown hook will not be installed.");
106106
}
@@ -145,15 +145,18 @@ public synchronized void start() {
145145
}
146146
}
147147

148-
public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
148+
@Override
149+
public void stop() throws OperatorException {
150+
Duration reconciliationTerminationTimeout =
151+
configurationService.reconciliationTerminationTimeout();
149152
if (!started) {
150153
return;
151154
}
152-
log.info(
153-
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
155+
log.info("Operator SDK {} is shutting down...",
156+
configurationService.getVersion().getSdkVersion());
154157
controllerManager.stop();
155158

156-
configurationService.getExecutorServiceManager().stop(gracefulShutdownTimeout);
159+
configurationService.getExecutorServiceManager().stop(reconciliationTerminationTimeout);
157160
leaderElectionManager.stop();
158161
if (configurationService.closeClientOnStop()) {
159162
getKubernetesClient().close();
@@ -162,11 +165,6 @@ public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
162165
started = false;
163166
}
164167

165-
@Override
166-
public void stop() throws OperatorException {
167-
stop(Duration.ZERO);
168-
}
169-
170168
/**
171169
* Add a registration requests for the specified reconciler with this operator. The effective
172170
* registration of the reconciler is delayed till the operator is started.

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

+27-20
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,8 @@ static ConfigurationService newOverriddenConfigurationService(
104104
*
105105
* @param reconciler the reconciler we want the configuration of
106106
* @param <R> the {@code CustomResource} type associated with the specified reconciler
107-
* @return the {@link ControllerConfiguration} associated with the specified reconciler or {@code
108-
* null} if no configuration exists for the reconciler
107+
* @return the {@link ControllerConfiguration} associated with the specified reconciler or
108+
* {@code null} if no configuration exists for the reconciler
109109
*/
110110
<R extends HasMetadata> ControllerConfiguration<R> getConfigurationFor(Reconciler<R> reconciler);
111111

@@ -214,7 +214,7 @@ default int concurrentWorkflowExecutorThreads() {
214214

215215
/**
216216
* Override to provide a custom {@link Metrics} implementation
217-
*
217+
*
218218
* @return the {@link Metrics} implementation
219219
*/
220220
default Metrics getMetrics() {
@@ -224,7 +224,7 @@ default Metrics getMetrics() {
224224
/**
225225
* Override to provide a custom {@link ExecutorService} implementation to change how threads
226226
* handle concurrent reconciliations
227-
*
227+
*
228228
* @return the {@link ExecutorService} implementation to use for concurrent reconciliation
229229
* processing
230230
*/
@@ -235,7 +235,7 @@ default ExecutorService getExecutorService() {
235235
/**
236236
* Override to provide a custom {@link ExecutorService} implementation to change how dependent
237237
* workflows are processed in parallel
238-
*
238+
*
239239
* @return the {@link ExecutorService} implementation to use for dependent workflow processing
240240
*/
241241
default ExecutorService getWorkflowExecutorService() {
@@ -245,7 +245,7 @@ default ExecutorService getWorkflowExecutorService() {
245245
/**
246246
* Determines whether the associated Kubernetes client should be closed when the associated
247247
* {@link io.javaoperatorsdk.operator.Operator} is stopped.
248-
*
248+
*
249249
* @return {@code true} if the Kubernetes should be closed on stop, {@code false} otherwise
250250
*/
251251
default boolean closeClientOnStop() {
@@ -255,7 +255,7 @@ default boolean closeClientOnStop() {
255255
/**
256256
* Override to provide a custom {@link DependentResourceFactory} implementation to change how
257257
* {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} are instantiated
258-
*
258+
*
259259
* @return the custom {@link DependentResourceFactory} implementation
260260
*/
261261
@SuppressWarnings("rawtypes")
@@ -267,7 +267,7 @@ default DependentResourceFactory dependentResourceFactory() {
267267
* Retrieves the optional {@link LeaderElectionConfiguration} to specify how the associated
268268
* {@link io.javaoperatorsdk.operator.Operator} handles leader election to ensure only one
269269
* instance of the operator runs on the cluster at any given time
270-
*
270+
*
271271
* @return the {@link LeaderElectionConfiguration}
272272
*/
273273
default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
@@ -302,6 +302,17 @@ default Duration cacheSyncTimeout() {
302302
return Duration.ofMinutes(2);
303303
}
304304

305+
/**
306+
* This is the timeout value that allows the reconciliation threads to gracefully shut down. If no
307+
* value is set, the default is immediate shutdown.
308+
*
309+
* @return The duration of time to wait before terminating the reconciliation threads
310+
* @since 5.0.0
311+
*/
312+
default Duration reconciliationTerminationTimeout() {
313+
return Duration.ZERO;
314+
}
315+
305316
/**
306317
* Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received
307318
* a resource that cannot be deserialized.
@@ -329,7 +340,7 @@ default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
329340
* Override to provide a custom {@link ManagedWorkflowFactory} implementation to change how
330341
* {@link io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow} are
331342
* instantiated
332-
*
343+
*
333344
* @return the custom {@link ManagedWorkflowFactory} implementation
334345
*/
335346
@SuppressWarnings("rawtypes")
@@ -339,7 +350,7 @@ default ManagedWorkflowFactory getWorkflowFactory() {
339350

340351
/**
341352
* Override to provide a custom {@link ExecutorServiceManager} implementation
342-
*
353+
*
343354
* @return the custom {@link ExecutorServiceManager} implementation
344355
*/
345356
default ExecutorServiceManager getExecutorServiceManager() {
@@ -356,9 +367,8 @@ default ExecutorServiceManager getExecutorServiceManager() {
356367
* SSA based create/update can be still used with the legacy matching, just overriding the match
357368
* method of Kubernetes Dependent Resource.
358369
*
359-
* @since 4.4.0
360-
*
361370
* @return if SSA should be used for dependent resources
371+
* @since 4.4.0
362372
*/
363373
default boolean ssaBasedCreateUpdateMatchForDependentResources() {
364374
return true;
@@ -443,9 +453,8 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
443453
* <p>
444454
* Disable this if you want to react to your own dependent resource updates
445455
*
446-
* @since 4.5.0
447-
*
448456
* @return if special annotation should be used for dependent resource to filter events
457+
* @since 4.5.0
449458
*/
450459
default boolean previousAnnotationForDependentResourcesEventFiltering() {
451460
return true;
@@ -460,9 +469,8 @@ default boolean previousAnnotationForDependentResourcesEventFiltering() {
460469
* logic, and you want to further minimize the amount of work done / updates issued by the
461470
* operator.
462471
*
463-
* @since 4.5.0
464-
*
465472
* @return if resource version should be parsed (as integer)
473+
* @since 4.5.0
466474
*/
467475
default boolean parseResourceVersionsForEventFilteringAndCaching() {
468476
return false;
@@ -475,8 +483,8 @@ default boolean parseResourceVersionsForEventFilteringAndCaching() {
475483
*
476484
* @return {@code true} if Server-Side Apply (SSA) should be used when patching the primary
477485
* resources, {@code false} otherwise
478-
* @since 5.0.0
479486
* @see ConfigurationServiceOverrider#withUseSSAToPatchPrimaryResource(boolean)
487+
* @since 5.0.0
480488
*/
481489
default boolean useSSAToPatchPrimaryResource() {
482490
return true;
@@ -487,18 +495,17 @@ default boolean useSSAToPatchPrimaryResource() {
487495
* Determines whether resources retrieved from caches such as via calls to
488496
* {@link Context#getSecondaryResource(Class)} should be defensively cloned first.
489497
* </p>
490-
*
498+
*
491499
* <p>
492500
* Defensive cloning to prevent problematic cache modifications (modifying the resource would
493501
* otherwise modify the stored copy in the cache) was transparently done in previous JOSDK
494502
* versions. This might have performance consequences and, with the more prevalent use of
495503
* Server-Side Apply, where you should create a new copy of your resource with only modified
496504
* fields, such modifications of these resources are less likely to occur.
497505
* </p>
498-
*
506+
*
499507
* @return {@code true} if resources should be defensively cloned before returning them from
500508
* caches, {@code false} otherwise
501-
*
502509
* @since 5.0.0
503510
*/
504511
default boolean cloneSecondaryResourcesWhenGettingFromCache() {

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

+13
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public class ConfigurationServiceOverrider {
3333
private InformerStoppedHandler informerStoppedHandler;
3434
private Boolean stopOnInformerErrorDuringStartup;
3535
private Duration cacheSyncTimeout;
36+
private Duration reconciliationTerminationTimeout;
3637
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
3738
private Set<Class<? extends HasMetadata>> defaultNonSSAResource;
3839
private Boolean previousAnnotationForDependentResources;
@@ -132,6 +133,12 @@ public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTime
132133
return this;
133134
}
134135

136+
public ConfigurationServiceOverrider withReconciliationTerminationTimeout(
137+
Duration reconciliationTerminationTimeout) {
138+
this.reconciliationTerminationTimeout = reconciliationTerminationTimeout;
139+
return this;
140+
}
141+
135142
public ConfigurationServiceOverrider withSSABasedCreateUpdateMatchForDependentResources(
136143
boolean value) {
137144
this.ssaBasedCreateUpdateMatchForDependentResources = value;
@@ -273,6 +280,12 @@ public Duration cacheSyncTimeout() {
273280
return overriddenValueOrDefault(cacheSyncTimeout, ConfigurationService::cacheSyncTimeout);
274281
}
275282

283+
@Override
284+
public Duration reconciliationTerminationTimeout() {
285+
return overriddenValueOrDefault(reconciliationTerminationTimeout,
286+
ConfigurationService::reconciliationTerminationTimeout);
287+
}
288+
276289
@Override
277290
public boolean ssaBasedCreateUpdateMatchForDependentResources() {
278291
return overriddenValueOrDefault(ssaBasedCreateUpdateMatchForDependentResources,

Diff for: operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java

+4
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.time.Duration;
34
import java.util.Optional;
45
import java.util.concurrent.Executors;
56

@@ -63,6 +64,7 @@ public <R extends HasMetadata> R clone(R object) {
6364
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
6465
.withInformerStoppedHandler((informer, ex) -> {
6566
})
67+
.withReconciliationTerminationTimeout(Duration.ofSeconds(30))
6668
.build();
6769

6870
assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
@@ -77,6 +79,8 @@ public <R extends HasMetadata> R clone(R object) {
7779
overridden.getLeaderElectionConfiguration());
7880
assertNotEquals(config.getInformerStoppedHandler(),
7981
overridden.getLeaderElectionConfiguration());
82+
assertNotEquals(config.reconciliationTerminationTimeout(),
83+
overridden.reconciliationTerminationTimeout());
8084
}
8185

8286
}

Diff for: operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java

+6-11
Original file line numberDiff line numberDiff line change
@@ -18,26 +18,21 @@
1818
public class GracefulStopIT {
1919

2020
public static final String TEST_1 = "test1";
21-
public static final String TEST_2 = "test2";
2221

2322
@RegisterExtension
2423
LocallyRunOperatorExtension operator =
2524
LocallyRunOperatorExtension.builder()
26-
.withConfigurationService(o -> o.withCloseClientOnStop(false))
25+
.withConfigurationService(o -> o.withCloseClientOnStop(false)
26+
.withReconciliationTerminationTimeout(Duration.ofMillis(RECONCILER_SLEEP)))
2727
.withReconciler(new GracefulStopTestReconciler())
2828
.build();
2929

3030
@Test
31-
void stopsGracefullyWIthTimeout() {
32-
testGracefulStop(TEST_1, RECONCILER_SLEEP, 2);
31+
void stopsGracefullyWithTimeoutConfiguration() {
32+
testGracefulStop(TEST_1, 2);
3333
}
3434

35-
@Test
36-
void stopsGracefullyWithExpiredTimeout() {
37-
testGracefulStop(TEST_2, RECONCILER_SLEEP / 5, 1);
38-
}
39-
40-
private void testGracefulStop(String resourceName, int stopTimeout, int expectedFinalGeneration) {
35+
private void testGracefulStop(String resourceName, int expectedFinalGeneration) {
4136
var testRes = operator.create(testResource(resourceName));
4237
await().untilAsserted(() -> {
4338
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
@@ -54,7 +49,7 @@ private void testGracefulStop(String resourceName, int stopTimeout, int expected
5449
() -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
5550
.getNumberOfExecutions()).isEqualTo(2));
5651

57-
operator.getOperator().stop(Duration.ofMillis(stopTimeout));
52+
operator.getOperator().stop();
5853

5954
await().untilAsserted(() -> {
6055
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);

Diff for: operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ void beforeEach(TestInfo testInfo) {
7676
@AfterEach
7777
void cleanup() {
7878
if (operator != null) {
79-
operator.stop(Duration.ofSeconds(1));
79+
operator.stop();
8080
}
8181
adminClient.resource(dependentConfigMap()).delete();
8282
adminClient.resource(testCustomResource()).delete();
@@ -321,6 +321,7 @@ Operator startOperator(boolean stopOnInformerErrorDuringStartup, boolean addStop
321321
co.withKubernetesClient(clientUsingServiceAccount());
322322
co.withStopOnInformerErrorDuringStartup(stopOnInformerErrorDuringStartup);
323323
co.withCacheSyncTimeout(Duration.ofMillis(3000));
324+
co.withReconciliationTerminationTimeout(Duration.ofSeconds(1));
324325
if (addStopHandler) {
325326
co.withInformerStoppedHandler((informer, ex) -> replacementStopHandlerCalled = true);
326327
}

0 commit comments

Comments
 (0)