Skip to content

fix: enforce valid values when creating ExecutorServices #1891

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,13 @@ public synchronized void start() {
version.getSdkVersion(),
version.getCommit(),
version.getBuiltTime());

final var clientVersion = Version.clientVersion();
log.info("Client version: {}", clientVersion);

// need to create new thread pools if we're restarting because they've been shut down when we
// previously stopped
configurationService.getExecutorServiceManager().start(configurationService);

// first start the controller manager before leader election,
// the leader election would start subsequently the processor if on
controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,32 +17,37 @@ public class AbstractConfigurationService implements ConfigurationService {
private final Version version;
private Cloner cloner;
private ObjectMapper mapper;
private ExecutorServiceManager executorServiceManager;

public AbstractConfigurationService(Version version) {
this(version, null, null);
this(version, null, null, null);
}

public AbstractConfigurationService(Version version, Cloner cloner) {
this(version, cloner, null);
this(version, cloner, null, null);
}

public AbstractConfigurationService(Version version, Cloner cloner, ObjectMapper mapper) {
public AbstractConfigurationService(Version version, Cloner cloner, ObjectMapper mapper,
ExecutorServiceManager executorServiceManager) {
this.version = version;
init(cloner, mapper);
init(cloner, mapper, executorServiceManager);
}

/**
* Subclasses can call this method to more easily initialize the {@link Cloner} and
* {@link ObjectMapper} associated with this ConfigurationService implementation. This is useful
* in situations where the cloner depends on a mapper that might require additional configuration
* steps before it's ready to be used.
* Subclasses can call this method to more easily initialize the {@link Cloner}
* {@link ObjectMapper} and {@link ExecutorServiceManager} associated with this
* ConfigurationService implementation. This is useful in situations where the cloner depends on a
* mapper that might require additional configuration steps before it's ready to be used.
*
* @param cloner the {@link Cloner} instance to be used
* @param mapper the {@link ObjectMapper} instance to be used
* @param executorServiceManager the {@link ExecutorServiceManager} instance to be used
*/
protected void init(Cloner cloner, ObjectMapper mapper) {
protected void init(Cloner cloner, ObjectMapper mapper,
ExecutorServiceManager executorServiceManager) {
this.cloner = cloner != null ? cloner : ConfigurationService.super.getResourceCloner();
this.mapper = mapper != null ? mapper : ConfigurationService.super.getObjectMapper();
this.executorServiceManager = executorServiceManager;
}

protected <R extends HasMetadata> void register(ControllerConfiguration<R> config) {
Expand Down Expand Up @@ -132,4 +137,13 @@ public Cloner getResourceCloner() {
public ObjectMapper getObjectMapper() {
return mapper;
}

@Override
public ExecutorServiceManager getExecutorServiceManager() {
// lazy init to avoid initializing thread pools for nothing in an overriding scenario
if (executorServiceManager == null) {
executorServiceManager = ConfigurationService.super.getExecutorServiceManager();
}
return executorServiceManager;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public BaseConfigurationService(Version version) {
}

public BaseConfigurationService(Version version, Cloner cloner, ObjectMapper mapper) {
super(version, cloner, mapper);
super(version, cloner, mapper, null);
}

public BaseConfigurationService(Version version, Cloner cloner) {
Expand All @@ -62,6 +62,7 @@ protected void logMissingReconcilerWarning(String reconcilerKey, String reconcil
reconcilersNameMessage);
}

@SuppressWarnings("unused")
public String getLoggerName() {
return LOGGER_NAME;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.slf4j.Logger;
Expand All @@ -24,6 +21,8 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import static io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.newThreadPoolExecutor;

/** An interface from which to retrieve configuration information. */
public interface ConfigurationService {

Expand Down Expand Up @@ -164,15 +163,13 @@ default Metrics getMetrics() {
}

default ExecutorService getExecutorService() {
return new ThreadPoolExecutor(minConcurrentReconciliationThreads(),
concurrentReconciliationThreads(),
1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
return newThreadPoolExecutor(minConcurrentReconciliationThreads(),
concurrentReconciliationThreads());
}

default ExecutorService getWorkflowExecutorService() {
return new ThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(),
concurrentWorkflowExecutorThreads(),
1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
return newThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(),
concurrentWorkflowExecutorThreads());
}

default boolean closeClientOnStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,21 @@ public ConfigurationServiceOverrider withConcurrentWorkflowExecutorThreads(int t
return this;
}

private int minimumMaxValueFor(Integer minValue) {
return minValue != null ? (minValue < 0 ? 0 : minValue) + 1 : 1;
}

public ConfigurationServiceOverrider withMinConcurrentReconciliationThreads(int threadNumber) {
this.minConcurrentReconciliationThreads = threadNumber;
this.minConcurrentReconciliationThreads = Utils.ensureValid(threadNumber,
"minimum reconciliation threads", ExecutorServiceManager.MIN_THREAD_NUMBER,
original.minConcurrentReconciliationThreads());
return this;
}

public ConfigurationServiceOverrider withMinConcurrentWorkflowExecutorThreads(int threadNumber) {
this.minConcurrentWorkflowExecutorThreads = threadNumber;
this.minConcurrentWorkflowExecutorThreads = Utils.ensureValid(threadNumber,
"minimum workflow execution threads", ExecutorServiceManager.MIN_THREAD_NUMBER,
original.minConcurrentWorkflowExecutorThreads());
return this;
}

Expand Down Expand Up @@ -150,14 +158,22 @@ public boolean checkCRDAndValidateLocalModel() {

@Override
public int concurrentReconciliationThreads() {
return concurrentReconciliationThreads != null ? concurrentReconciliationThreads
: original.concurrentReconciliationThreads();
return Utils.ensureValid(
concurrentReconciliationThreads != null ? concurrentReconciliationThreads
: original.concurrentReconciliationThreads(),
"maximum reconciliation threads",
minimumMaxValueFor(minConcurrentReconciliationThreads),
original.concurrentReconciliationThreads());
}

@Override
public int concurrentWorkflowExecutorThreads() {
return concurrentWorkflowExecutorThreads != null ? concurrentWorkflowExecutorThreads
: original.concurrentWorkflowExecutorThreads();
return Utils.ensureValid(
concurrentWorkflowExecutorThreads != null ? concurrentWorkflowExecutorThreads
: original.concurrentWorkflowExecutorThreads(),
"maximum workflow execution threads",
minimumMaxValueFor(minConcurrentWorkflowExecutorThreads),
original.concurrentWorkflowExecutorThreads());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand All @@ -20,16 +22,24 @@
import io.javaoperatorsdk.operator.OperatorException;

public class ExecutorServiceManager {

private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
private final ExecutorService cachingExecutorService;
public static final int MIN_THREAD_NUMBER = 0;
private ExecutorService executor;
private ExecutorService workflowExecutor;
private ExecutorService cachingExecutorService;
private boolean started;

ExecutorServiceManager(ConfigurationService configurationService) {
this.cachingExecutorService = Executors.newCachedThreadPool();
this.executor = new InstrumentedExecutorService(configurationService.getExecutorService());
this.workflowExecutor =
new InstrumentedExecutorService(configurationService.getWorkflowExecutorService());
start(configurationService);
}

public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) {
minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER);
maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1);

return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES,
new LinkedBlockingDeque<>());
}

/**
Expand Down Expand Up @@ -92,6 +102,16 @@ public ExecutorService cachingExecutorService() {
return cachingExecutorService;
}

public void start(ConfigurationService configurationService) {
if (!started) {
this.cachingExecutorService = Executors.newCachedThreadPool();
this.executor = new InstrumentedExecutorService(configurationService.getExecutorService());
this.workflowExecutor =
new InstrumentedExecutorService(configurationService.getWorkflowExecutorService());
started = true;
}
}

public void stop(Duration gracefulShutdownTimeout) {
try {
var parallelExec = Executors.newFixedThreadPool(3);
Expand All @@ -100,6 +120,7 @@ public void stop(Duration gracefulShutdownTimeout) {
shutdown(workflowExecutor, gracefulShutdownTimeout),
shutdown(cachingExecutorService, gracefulShutdownTimeout)));
parallelExec.shutdownNow();
started = false;
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ public static Version loadFromProperties() {
builtTime);
}

public static int ensureValid(int value, String description, int minValue) {
return ensureValid(value, description, minValue, minValue);
}

public static int ensureValid(int value, String description, int minValue, int defaultValue) {
if (value < minValue) {
if (defaultValue < minValue) {
throw new IllegalArgumentException(
"Default value for " + description + " must be greater than " + minValue);
}
log.warn("Requested " + description + " should be greater than " + minValue + ". Requested: "
+ value + ", using " + defaultValue + (defaultValue == minValue ? "" : " (default)") +
" instead");
value = defaultValue;
}
return value;
}

@SuppressWarnings("unused")
// this is used in the Quarkus extension
public static boolean isValidateCustomResourcesEnvVarSet() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.junit.jupiter.api.Test;
Expand All @@ -13,6 +12,7 @@

import com.fasterxml.jackson.databind.ObjectMapper;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;

class ConfigurationServiceOverriderTest {
Expand Down Expand Up @@ -42,31 +42,11 @@ public Config getClientConfiguration() {
return new ConfigBuilder().withNamespace("namespace").build();
}

@Override
public int concurrentReconciliationThreads() {
return -1;
}

@Override
public int getTerminationTimeoutSeconds() {
return -1;
}

@Override
public Metrics getMetrics() {
return METRICS;
}

@Override
public ExecutorService getExecutorService() {
return null;
}

@Override
public boolean closeClientOnStop() {
return true;
}

@Override
public ObjectMapper getObjectMapper() {
return OBJECT_MAPPER;
Expand Down Expand Up @@ -121,4 +101,24 @@ public <R extends HasMetadata> R clone(R object) {
overridden.getLeaderElectionConfiguration());
}

@Test
void shouldReplaceInvalidValues() {
final var original = new BaseConfigurationService();

final var service = ConfigurationService.newOverriddenConfigurationService(original,
o -> o
.withConcurrentReconciliationThreads(0)
.withMinConcurrentReconciliationThreads(-1)
.withConcurrentWorkflowExecutorThreads(2)
.withMinConcurrentWorkflowExecutorThreads(3));

assertEquals(original.minConcurrentReconciliationThreads(),
service.minConcurrentReconciliationThreads());
assertEquals(original.concurrentReconciliationThreads(),
service.concurrentReconciliationThreads());
assertEquals(3, service.minConcurrentWorkflowExecutorThreads());
assertEquals(original.concurrentWorkflowExecutorThreads(),
service.concurrentWorkflowExecutorThreads());
}

}