Skip to content

Commit a6013d8

Browse files
metacosmcsviri
authored andcommitted
fix: avoid always creating ExecutorServices, enforce valid values (#1891)
Fixes #1889
1 parent 9ef0cc4 commit a6013d8

File tree

8 files changed

+125
-54
lines changed

8 files changed

+125
-54
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,13 @@ public synchronized void start() {
134134
version.getSdkVersion(),
135135
version.getCommit(),
136136
version.getBuiltTime());
137-
138137
final var clientVersion = Version.clientVersion();
139138
log.info("Client version: {}", clientVersion);
139+
140+
// need to create new thread pools if we're restarting because they've been shut down when we
141+
// previously stopped
142+
configurationService.getExecutorServiceManager().start(configurationService);
143+
140144
// first start the controller manager before leader election,
141145
// the leader election would start subsequently the processor if on
142146
controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled());

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AbstractConfigurationService.java

+23-9
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,37 @@ public class AbstractConfigurationService implements ConfigurationService {
1717
private final Version version;
1818
private Cloner cloner;
1919
private ObjectMapper mapper;
20+
private ExecutorServiceManager executorServiceManager;
2021

2122
public AbstractConfigurationService(Version version) {
22-
this(version, null, null);
23+
this(version, null, null, null);
2324
}
2425

2526
public AbstractConfigurationService(Version version, Cloner cloner) {
26-
this(version, cloner, null);
27+
this(version, cloner, null, null);
2728
}
2829

29-
public AbstractConfigurationService(Version version, Cloner cloner, ObjectMapper mapper) {
30+
public AbstractConfigurationService(Version version, Cloner cloner, ObjectMapper mapper,
31+
ExecutorServiceManager executorServiceManager) {
3032
this.version = version;
31-
init(cloner, mapper);
33+
init(cloner, mapper, executorServiceManager);
3234
}
3335

3436
/**
35-
* Subclasses can call this method to more easily initialize the {@link Cloner} and
36-
* {@link ObjectMapper} associated with this ConfigurationService implementation. This is useful
37-
* in situations where the cloner depends on a mapper that might require additional configuration
38-
* steps before it's ready to be used.
37+
* Subclasses can call this method to more easily initialize the {@link Cloner}
38+
* {@link ObjectMapper} and {@link ExecutorServiceManager} associated with this
39+
* ConfigurationService implementation. This is useful in situations where the cloner depends on a
40+
* mapper that might require additional configuration steps before it's ready to be used.
3941
*
4042
* @param cloner the {@link Cloner} instance to be used
4143
* @param mapper the {@link ObjectMapper} instance to be used
44+
* @param executorServiceManager the {@link ExecutorServiceManager} instance to be used
4245
*/
43-
protected void init(Cloner cloner, ObjectMapper mapper) {
46+
protected void init(Cloner cloner, ObjectMapper mapper,
47+
ExecutorServiceManager executorServiceManager) {
4448
this.cloner = cloner != null ? cloner : ConfigurationService.super.getResourceCloner();
4549
this.mapper = mapper != null ? mapper : ConfigurationService.super.getObjectMapper();
50+
this.executorServiceManager = executorServiceManager;
4651
}
4752

4853
protected <R extends HasMetadata> void register(ControllerConfiguration<R> config) {
@@ -132,4 +137,13 @@ public Cloner getResourceCloner() {
132137
public ObjectMapper getObjectMapper() {
133138
return mapper;
134139
}
140+
141+
@Override
142+
public ExecutorServiceManager getExecutorServiceManager() {
143+
// lazy init to avoid initializing thread pools for nothing in an overriding scenario
144+
if (executorServiceManager == null) {
145+
executorServiceManager = ConfigurationService.super.getExecutorServiceManager();
146+
}
147+
return executorServiceManager;
148+
}
135149
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public BaseConfigurationService(Version version) {
4545
}
4646

4747
public BaseConfigurationService(Version version, Cloner cloner, ObjectMapper mapper) {
48-
super(version, cloner, mapper);
48+
super(version, cloner, mapper, null);
4949
}
5050

5151
public BaseConfigurationService(Version version, Cloner cloner) {
@@ -62,6 +62,7 @@ protected void logMissingReconcilerWarning(String reconcilerKey, String reconcil
6262
reconcilersNameMessage);
6363
}
6464

65+
@SuppressWarnings("unused")
6566
public String getLoggerName() {
6667
return LOGGER_NAME;
6768
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

+6-9
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@
44
import java.util.Optional;
55
import java.util.Set;
66
import java.util.concurrent.ExecutorService;
7-
import java.util.concurrent.LinkedBlockingDeque;
8-
import java.util.concurrent.ThreadPoolExecutor;
9-
import java.util.concurrent.TimeUnit;
107
import java.util.function.Consumer;
118

129
import org.slf4j.Logger;
@@ -24,6 +21,8 @@
2421
import com.fasterxml.jackson.core.JsonProcessingException;
2522
import com.fasterxml.jackson.databind.ObjectMapper;
2623

24+
import static io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.newThreadPoolExecutor;
25+
2726
/** An interface from which to retrieve configuration information. */
2827
public interface ConfigurationService {
2928

@@ -164,15 +163,13 @@ default Metrics getMetrics() {
164163
}
165164

166165
default ExecutorService getExecutorService() {
167-
return new ThreadPoolExecutor(minConcurrentReconciliationThreads(),
168-
concurrentReconciliationThreads(),
169-
1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
166+
return newThreadPoolExecutor(minConcurrentReconciliationThreads(),
167+
concurrentReconciliationThreads());
170168
}
171169

172170
default ExecutorService getWorkflowExecutorService() {
173-
return new ThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(),
174-
concurrentWorkflowExecutorThreads(),
175-
1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
171+
return newThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(),
172+
concurrentWorkflowExecutorThreads());
176173
}
177174

178175
default boolean closeClientOnStop() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

+22-6
Original file line numberDiff line numberDiff line change
@@ -57,13 +57,21 @@ public ConfigurationServiceOverrider withConcurrentWorkflowExecutorThreads(int t
5757
return this;
5858
}
5959

60+
private int minimumMaxValueFor(Integer minValue) {
61+
return minValue != null ? (minValue < 0 ? 0 : minValue) + 1 : 1;
62+
}
63+
6064
public ConfigurationServiceOverrider withMinConcurrentReconciliationThreads(int threadNumber) {
61-
this.minConcurrentReconciliationThreads = threadNumber;
65+
this.minConcurrentReconciliationThreads = Utils.ensureValid(threadNumber,
66+
"minimum reconciliation threads", ExecutorServiceManager.MIN_THREAD_NUMBER,
67+
original.minConcurrentReconciliationThreads());
6268
return this;
6369
}
6470

6571
public ConfigurationServiceOverrider withMinConcurrentWorkflowExecutorThreads(int threadNumber) {
66-
this.minConcurrentWorkflowExecutorThreads = threadNumber;
72+
this.minConcurrentWorkflowExecutorThreads = Utils.ensureValid(threadNumber,
73+
"minimum workflow execution threads", ExecutorServiceManager.MIN_THREAD_NUMBER,
74+
original.minConcurrentWorkflowExecutorThreads());
6775
return this;
6876
}
6977

@@ -150,14 +158,22 @@ public boolean checkCRDAndValidateLocalModel() {
150158

151159
@Override
152160
public int concurrentReconciliationThreads() {
153-
return concurrentReconciliationThreads != null ? concurrentReconciliationThreads
154-
: original.concurrentReconciliationThreads();
161+
return Utils.ensureValid(
162+
concurrentReconciliationThreads != null ? concurrentReconciliationThreads
163+
: original.concurrentReconciliationThreads(),
164+
"maximum reconciliation threads",
165+
minimumMaxValueFor(minConcurrentReconciliationThreads),
166+
original.concurrentReconciliationThreads());
155167
}
156168

157169
@Override
158170
public int concurrentWorkflowExecutorThreads() {
159-
return concurrentWorkflowExecutorThreads != null ? concurrentWorkflowExecutorThreads
160-
: original.concurrentWorkflowExecutorThreads();
171+
return Utils.ensureValid(
172+
concurrentWorkflowExecutorThreads != null ? concurrentWorkflowExecutorThreads
173+
: original.concurrentWorkflowExecutorThreads(),
174+
"maximum workflow execution threads",
175+
minimumMaxValueFor(minConcurrentWorkflowExecutorThreads),
176+
original.concurrentWorkflowExecutorThreads());
161177
}
162178

163179
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

+28-7
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import java.util.concurrent.ExecutorService;
99
import java.util.concurrent.Executors;
1010
import java.util.concurrent.Future;
11+
import java.util.concurrent.LinkedBlockingDeque;
12+
import java.util.concurrent.ThreadPoolExecutor;
1113
import java.util.concurrent.TimeUnit;
1214
import java.util.concurrent.TimeoutException;
1315
import java.util.function.Function;
@@ -20,16 +22,24 @@
2022
import io.javaoperatorsdk.operator.OperatorException;
2123

2224
public class ExecutorServiceManager {
25+
2326
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
24-
private final ExecutorService executor;
25-
private final ExecutorService workflowExecutor;
26-
private final ExecutorService cachingExecutorService;
27+
public static final int MIN_THREAD_NUMBER = 0;
28+
private ExecutorService executor;
29+
private ExecutorService workflowExecutor;
30+
private ExecutorService cachingExecutorService;
31+
private boolean started;
2732

2833
ExecutorServiceManager(ConfigurationService configurationService) {
29-
this.cachingExecutorService = Executors.newCachedThreadPool();
30-
this.executor = new InstrumentedExecutorService(configurationService.getExecutorService());
31-
this.workflowExecutor =
32-
new InstrumentedExecutorService(configurationService.getWorkflowExecutorService());
34+
start(configurationService);
35+
}
36+
37+
public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) {
38+
minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER);
39+
maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1);
40+
41+
return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES,
42+
new LinkedBlockingDeque<>());
3343
}
3444

3545
/**
@@ -92,6 +102,16 @@ public ExecutorService cachingExecutorService() {
92102
return cachingExecutorService;
93103
}
94104

105+
public void start(ConfigurationService configurationService) {
106+
if (!started) {
107+
this.cachingExecutorService = Executors.newCachedThreadPool();
108+
this.executor = new InstrumentedExecutorService(configurationService.getExecutorService());
109+
this.workflowExecutor =
110+
new InstrumentedExecutorService(configurationService.getWorkflowExecutorService());
111+
started = true;
112+
}
113+
}
114+
95115
public void stop(Duration gracefulShutdownTimeout) {
96116
try {
97117
var parallelExec = Executors.newFixedThreadPool(3);
@@ -100,6 +120,7 @@ public void stop(Duration gracefulShutdownTimeout) {
100120
shutdown(workflowExecutor, gracefulShutdownTimeout),
101121
shutdown(cachingExecutorService, gracefulShutdownTimeout)));
102122
parallelExec.shutdownNow();
123+
started = false;
103124
} catch (InterruptedException e) {
104125
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
105126
Thread.currentThread().interrupt();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java

+18
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,24 @@ public static Version loadFromProperties() {
7272
builtTime);
7373
}
7474

75+
public static int ensureValid(int value, String description, int minValue) {
76+
return ensureValid(value, description, minValue, minValue);
77+
}
78+
79+
public static int ensureValid(int value, String description, int minValue, int defaultValue) {
80+
if (value < minValue) {
81+
if (defaultValue < minValue) {
82+
throw new IllegalArgumentException(
83+
"Default value for " + description + " must be greater than " + minValue);
84+
}
85+
log.warn("Requested " + description + " should be greater than " + minValue + ". Requested: "
86+
+ value + ", using " + defaultValue + (defaultValue == minValue ? "" : " (default)") +
87+
" instead");
88+
value = defaultValue;
89+
}
90+
return value;
91+
}
92+
7593
@SuppressWarnings("unused")
7694
// this is used in the Quarkus extension
7795
public static boolean isValidateCustomResourcesEnvVarSet() {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java

+21-21
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package io.javaoperatorsdk.operator.api.config;
22

33
import java.util.Optional;
4-
import java.util.concurrent.ExecutorService;
54
import java.util.concurrent.Executors;
65

76
import org.junit.jupiter.api.Test;
@@ -13,6 +12,7 @@
1312

1413
import com.fasterxml.jackson.databind.ObjectMapper;
1514

15+
import static org.junit.jupiter.api.Assertions.assertEquals;
1616
import static org.junit.jupiter.api.Assertions.assertNotEquals;
1717

1818
class ConfigurationServiceOverriderTest {
@@ -42,31 +42,11 @@ public Config getClientConfiguration() {
4242
return new ConfigBuilder().withNamespace("namespace").build();
4343
}
4444

45-
@Override
46-
public int concurrentReconciliationThreads() {
47-
return -1;
48-
}
49-
50-
@Override
51-
public int getTerminationTimeoutSeconds() {
52-
return -1;
53-
}
54-
5545
@Override
5646
public Metrics getMetrics() {
5747
return METRICS;
5848
}
5949

60-
@Override
61-
public ExecutorService getExecutorService() {
62-
return null;
63-
}
64-
65-
@Override
66-
public boolean closeClientOnStop() {
67-
return true;
68-
}
69-
7050
@Override
7151
public ObjectMapper getObjectMapper() {
7252
return OBJECT_MAPPER;
@@ -121,4 +101,24 @@ public <R extends HasMetadata> R clone(R object) {
121101
overridden.getLeaderElectionConfiguration());
122102
}
123103

104+
@Test
105+
void shouldReplaceInvalidValues() {
106+
final var original = new BaseConfigurationService();
107+
108+
final var service = ConfigurationService.newOverriddenConfigurationService(original,
109+
o -> o
110+
.withConcurrentReconciliationThreads(0)
111+
.withMinConcurrentReconciliationThreads(-1)
112+
.withConcurrentWorkflowExecutorThreads(2)
113+
.withMinConcurrentWorkflowExecutorThreads(3));
114+
115+
assertEquals(original.minConcurrentReconciliationThreads(),
116+
service.minConcurrentReconciliationThreads());
117+
assertEquals(original.concurrentReconciliationThreads(),
118+
service.concurrentReconciliationThreads());
119+
assertEquals(3, service.minConcurrentWorkflowExecutorThreads());
120+
assertEquals(original.concurrentWorkflowExecutorThreads(),
121+
service.concurrentWorkflowExecutorThreads());
122+
}
123+
124124
}

0 commit comments

Comments
 (0)