Skip to content

feat: decouple from ObjectMapper #1953

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

import io.fabric8.kubernetes.api.model.authorization.v1.SelfSubjectRulesReview;
import io.fabric8.kubernetes.api.model.authorization.v1.SelfSubjectRulesReviewSpecBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectionConfig;
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElector;
Expand All @@ -32,14 +31,11 @@ public class LeaderElectionManager {
private final ControllerManager controllerManager;
private String identity;
private CompletableFuture<?> leaderElectionFuture;
private KubernetesClient kubernetesClient;
private final ConfigurationService configurationService;
private String leaseNamespace;

public LeaderElectionManager(KubernetesClient kubernetesClient,
ControllerManager controllerManager,
LeaderElectionManager(ControllerManager controllerManager,
ConfigurationService configurationService) {
this.kubernetesClient = kubernetesClient;
this.controllerManager = controllerManager;
this.configurationService = configurationService;
}
Expand All @@ -52,7 +48,7 @@ private void init(LeaderElectionConfiguration config) {
this.identity = identity(config);
leaseNamespace =
config.getLeaseNamespace().orElseGet(
() -> configurationService.getClientConfiguration().getNamespace());
() -> configurationService.getKubernetesClient().getConfiguration().getNamespace());
if (leaseNamespace == null) {
final var message =
"Lease namespace is not set and cannot be inferred. Leader election cannot continue.";
Expand All @@ -62,7 +58,8 @@ private void init(LeaderElectionConfiguration config) {
final var lock = new LeaseLock(leaseNamespace, config.getLeaseName(), identity);
// releaseOnCancel is not used in the underlying implementation
leaderElector = new LeaderElectorBuilder(
kubernetesClient, configurationService.getExecutorServiceManager().cachingExecutorService())
configurationService.getKubernetesClient(),
configurationService.getExecutorServiceManager().cachingExecutorService())
.withConfig(
new LeaderElectionConfig(
lock,
Expand Down Expand Up @@ -122,7 +119,7 @@ private void checkLeaseAccess() {
var verbs = Arrays.asList("create", "update", "get");
SelfSubjectRulesReview review = new SelfSubjectRulesReview();
review.setSpec(new SelfSubjectRulesReviewSpecBuilder().withNamespace(leaseNamespace).build());
var reviewResult = kubernetesClient.resource(review).create();
var reviewResult = configurationService.getKubernetesClient().resource(review).create();
log.debug("SelfSubjectRulesReview result: {}", reviewResult);
var foundRule = reviewResult.getStatus().getResourceRules().stream()
.filter(rule -> rule.getApiGroups().contains(COORDINATION_GROUP)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.Version;
import io.javaoperatorsdk.operator.api.config.BaseConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
Expand All @@ -26,7 +23,7 @@
@SuppressWarnings("rawtypes")
public class Operator implements LifecycleAware {
private static final Logger log = LoggerFactory.getLogger(Operator.class);
private static final int DEFAULT_MAX_CONCURRENT_REQUEST = 512;

private final KubernetesClient kubernetesClient;
private final ControllerManager controllerManager;
private final LeaderElectionManager leaderElectionManager;
Expand All @@ -38,49 +35,56 @@ public Operator() {
this((KubernetesClient) null);
}

public Operator(KubernetesClient kubernetesClient) {
this(kubernetesClient, new BaseConfigurationService());
Operator(KubernetesClient kubernetesClient) {
this(kubernetesClient, null);
}

/**
* @param configurationService implementation
* @deprecated Use {@link #Operator(Consumer)} instead
*/
@Deprecated(forRemoval = true)
@SuppressWarnings("unused")
public Operator(ConfigurationService configurationService) {
this(null, configurationService);
this(null, null);
}

public Operator(Consumer<ConfigurationServiceOverrider> overrider) {
this(null, overrider);
}

public Operator(KubernetesClient client, Consumer<ConfigurationServiceOverrider> overrider) {
this(client, ConfigurationService
.newOverriddenConfigurationService(new BaseConfigurationService(), overrider));
}

/**
* Note that Operator by default closes the client on stop, this can be changed using
* {@link ConfigurationService}
*
* @param kubernetesClient client to use to all Kubernetes related operations
* @param configurationService provides configuration
* @param client client to use to all Kubernetes related operations
* @param overrider a {@link ConfigurationServiceOverrider} consumer used to override the default
* {@link ConfigurationService} values
* @deprecated Use {@link Operator#Operator(Consumer)} instead, passing your custom client with
* {@link ConfigurationServiceOverrider#withKubernetesClient(KubernetesClient)}
*/
public Operator(KubernetesClient kubernetesClient, ConfigurationService configurationService) {
this.configurationService = configurationService;
@Deprecated
public Operator(KubernetesClient client, Consumer<ConfigurationServiceOverrider> overrider) {
// initialize the client if the user didn't provide one
if (client == null) {
var configurationService = ConfigurationService.newOverriddenConfigurationService(overrider);
client = configurationService.getKubernetesClient();
}

this.kubernetesClient = client;

// override the configuration service to use the same client
if (overrider != null) {
overrider = overrider.andThen(o -> o.withKubernetesClient(this.kubernetesClient));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to have unit/integration tests for these setting parts. Also shouldn't we deprecate the client as param if it is also in ConfigurationService?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand your last point. Regarding unit tests, this is far from finished. I'm not going to move forward with this unless we agree on the approach to take first.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, np, just a remark

} else {
overrider = o -> o.withKubernetesClient(this.kubernetesClient);
}
this.configurationService = ConfigurationService.newOverriddenConfigurationService(overrider);

final var executorServiceManager = configurationService.getExecutorServiceManager();
controllerManager = new ControllerManager(executorServiceManager);
this.kubernetesClient =
kubernetesClient != null ? kubernetesClient
: new KubernetesClientBuilder()
.withConfig(new ConfigBuilder()
.withMaxConcurrentRequests(DEFAULT_MAX_CONCURRENT_REQUEST).build())
.build();


leaderElectionManager =
new LeaderElectionManager(kubernetesClient, controllerManager, configurationService);
leaderElectionManager = new LeaderElectionManager(controllerManager, configurationService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,38 @@
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;

import com.fasterxml.jackson.databind.ObjectMapper;

@SuppressWarnings("rawtypes")
public class AbstractConfigurationService implements ConfigurationService {
private final Map<String, ControllerConfiguration> configurations = new ConcurrentHashMap<>();
private final Version version;
private Cloner cloner;
private ObjectMapper mapper;
private ExecutorServiceManager executorServiceManager;

public AbstractConfigurationService(Version version) {
this(version, null, null, null);
this(version, null, null);
}

public AbstractConfigurationService(Version version, Cloner cloner) {
this(version, cloner, null, null);
this(version, cloner, null);
}

public AbstractConfigurationService(Version version, Cloner cloner, ObjectMapper mapper,
public AbstractConfigurationService(Version version, Cloner cloner,
ExecutorServiceManager executorServiceManager) {
this.version = version;
init(cloner, mapper, executorServiceManager);
init(cloner, executorServiceManager);
}

/**
* Subclasses can call this method to more easily initialize the {@link Cloner}
* {@link ObjectMapper} and {@link ExecutorServiceManager} associated with this
* ConfigurationService implementation. This is useful in situations where the cloner depends on a
* mapper that might require additional configuration steps before it's ready to be used.
* Subclasses can call this method to more easily initialize the {@link Cloner} and
* {@link ExecutorServiceManager} associated with this ConfigurationService implementation. This
* is useful in situations where the cloner depends on a mapper that might require additional
* configuration steps before it's ready to be used.
*
* @param cloner the {@link Cloner} instance to be used
* @param mapper the {@link ObjectMapper} instance to be used
* @param executorServiceManager the {@link ExecutorServiceManager} instance to be used
*/
protected void init(Cloner cloner, ObjectMapper mapper,
ExecutorServiceManager executorServiceManager) {
protected void init(Cloner cloner, ExecutorServiceManager executorServiceManager) {
this.cloner = cloner != null ? cloner : ConfigurationService.super.getResourceCloner();
this.mapper = mapper != null ? mapper : ConfigurationService.super.getObjectMapper();
this.executorServiceManager = executorServiceManager;
}

Expand Down Expand Up @@ -133,11 +127,6 @@ public Cloner getResourceCloner() {
return cloner;
}

@Override
public ObjectMapper getObjectMapper() {
return mapper;
}

@Override
public ExecutorServiceManager getExecutorServiceManager() {
// lazy init to avoid initializing thread pools for nothing in an overriding scenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,6 @@
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.retry.Retry;

import com.fasterxml.jackson.databind.ObjectMapper;

import static io.javaoperatorsdk.operator.api.config.ControllerConfiguration.CONTROLLER_NAME_AS_FIELD_MANAGER;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;

Expand All @@ -45,10 +43,6 @@ public BaseConfigurationService(Version version) {
super(version);
}

public BaseConfigurationService(Version version, Cloner cloner, ObjectMapper mapper) {
super(version, cloner, mapper, null);
}

public BaseConfigurationService(Version version, Cloner cloner) {
super(version, cloner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,25 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.utils.Serialization;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceFactory;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import static io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.newThreadPoolExecutor;

/** An interface from which to retrieve configuration information. */
public interface ConfigurationService {

Logger log = LoggerFactory.getLogger(ConfigurationService.class);

int DEFAULT_MAX_CONCURRENT_REQUEST = 512;

/**
* Retrieves the configuration associated with the specified reconciler
*
Expand All @@ -38,14 +40,30 @@ public interface ConfigurationService {
*/
<R extends HasMetadata> ControllerConfiguration<R> getConfigurationFor(Reconciler<R> reconciler);


/**
* Retrieves the Kubernetes client configuration
* Used to clone custom resources. It is strongly suggested that implementors override this method
* since the default implementation creates a new {@link Cloner} instance each time this method is
* called.
*
* @return the configuration of the Kubernetes client, defaulting to the provided
* auto-configuration
* @return the configured {@link Cloner}
*/
default Config getClientConfiguration() {
return Config.autoConfigure(null);
default Cloner getResourceCloner() {
return new Cloner() {
@Override
public <R extends HasMetadata> R clone(R object) {
return getKubernetesClient().getKubernetesSerialization().clone(object);
}
};
}

default KubernetesClient getKubernetesClient() {
return new KubernetesClientBuilder()
.withConfig(new ConfigBuilder(Config.autoConfigure(null))
.withMaxConcurrentRequests(DEFAULT_MAX_CONCURRENT_REQUEST)
.build())
.withKubernetesSerialization(new KubernetesSerialization())
.build();
}

/**
Expand Down Expand Up @@ -120,28 +138,6 @@ default int minConcurrentWorkflowExecutorThreads() {
return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
}

/**
* Used to clone custom resources. It is strongly suggested that implementors override this method
* since the default implementation creates a new {@link Cloner} instance each time this method is
* called.
*
* @return the configured {@link Cloner}
*/
default Cloner getResourceCloner() {
return new Cloner() {
@SuppressWarnings("unchecked")
@Override
public HasMetadata clone(HasMetadata object) {
try {
final var mapper = getObjectMapper();
return mapper.readValue(mapper.writeValueAsString(object), object.getClass());
} catch (JsonProcessingException e) {
throw new IllegalStateException(e);
}
}
};
}

int DEFAULT_TERMINATION_TIMEOUT_SECONDS = 10;

/**
Expand Down Expand Up @@ -176,10 +172,6 @@ default boolean closeClientOnStop() {
return true;
}

default ObjectMapper getObjectMapper() {
return Serialization.jsonMapper();
}

@SuppressWarnings("rawtypes")
default DependentResourceFactory dependentResourceFactory() {
return DependentResourceFactory.DEFAULT;
Expand Down Expand Up @@ -261,6 +253,11 @@ static ConfigurationService newOverriddenConfigurationService(
return baseConfiguration;
}

static ConfigurationService newOverriddenConfigurationService(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels little smelly, since in an abstraction we creating an overrode of specific implementation, is this really needed?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, just noticing that the overriding method is almost always called passing it a new BaseConfigurationService instance. Also, I think it makes it easier for users because they might not know which instance to use to override the defaults. Open to cleaning this up further.

Consumer<ConfigurationServiceOverrider> overrider) {
return newOverriddenConfigurationService(new BaseConfigurationService(), overrider);
}

default ExecutorServiceManager getExecutorServiceManager() {
return new ExecutorServiceManager(this);
}
Expand Down
Loading