Skip to content

feat: handle clustered resource on secondary to primary mapper init #2313

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions .github/workflows/stale-issues-and-prs.yml

This file was deleted.

2 changes: 1 addition & 1 deletion bootstrapper-maven-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>java-operator-sdk</artifactId>
<groupId>io.javaoperatorsdk</groupId>
<version>4.8.1-SNAPSHOT</version>
<version>4.8.2-SNAPSHOT</version>
</parent>

<artifactId>bootstrapper</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion caffeine-bounded-cache-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>java-operator-sdk</artifactId>
<groupId>io.javaoperatorsdk</groupId>
<version>4.8.1-SNAPSHOT</version>
<version>4.8.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion micrometer-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>java-operator-sdk</artifactId>
<groupId>io.javaoperatorsdk</groupId>
<version>4.8.1-SNAPSHOT</version>
<version>4.8.2-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
4 changes: 2 additions & 2 deletions operator-framework-bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>io.javaoperatorsdk</groupId>
<artifactId>operator-framework-bom</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.8.2-SNAPSHOT</version>
<name>Operator SDK - Bill of Materials</name>
<packaging>pom</packaging>
<description>Java SDK for implementing Kubernetes operators</description>
Expand Down Expand Up @@ -61,7 +61,7 @@

<properties>
<nexus-staging-maven-plugin.version>1.6.13</nexus-staging-maven-plugin.version>
<maven-gpg-plugin.version>3.1.0</maven-gpg-plugin.version>
<maven-gpg-plugin.version>3.2.1</maven-gpg-plugin.version>
<maven-source-plugin.version>3.3.0</maven-source-plugin.version>
<maven-javadoc-plugin.version>3.6.3</maven-javadoc-plugin.version>
</properties>
Expand Down
2 changes: 1 addition & 1 deletion operator-framework-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<parent>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>java-operator-sdk</artifactId>
<version>4.8.1-SNAPSHOT</version>
<version>4.8.2-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.function.Consumer;

import org.slf4j.Logger;
Expand All @@ -24,8 +25,6 @@
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowFactory;

import static io.javaoperatorsdk.operator.api.config.ExecutorServiceManager.newThreadPoolExecutor;

/** An interface from which to retrieve configuration information. */
public interface ConfigurationService {

Expand Down Expand Up @@ -127,14 +126,18 @@ default boolean checkCRDAndValidateLocalModel() {
return false;
}

int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200;
int DEFAULT_RECONCILIATION_THREADS_NUMBER = 50;
/**
* @deprecated Not used anymore in the default implementation
*/
@Deprecated(forRemoval = true)
int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10;

/**
* The maximum number of threads the operator can spin out to dispatch reconciliation requests to
* reconcilers
* The number of threads the operator can spin out to dispatch reconciliation requests to
* reconcilers with the default executors
*
* @return the maximum number of concurrent reconciliation threads
* @return the number of concurrent reconciliation threads
*/
default int concurrentReconciliationThreads() {
return DEFAULT_RECONCILIATION_THREADS_NUMBER;
Expand All @@ -143,17 +146,24 @@ default int concurrentReconciliationThreads() {
/**
* The minimum number of threads the operator starts in the thread pool for reconciliations.
*
* @deprecated not used anymore by default executor implementation
* @return the minimum number of concurrent reconciliation threads
*/
@Deprecated(forRemoval = true)
default int minConcurrentReconciliationThreads() {
return MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;
}

int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER;
/**
* @deprecated Not used anymore in the default implementation
*/
@Deprecated(forRemoval = true)
int MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;

/**
* Retrieves the maximum number of threads the operator can spin out to be used in the workflows.
* Number of threads the operator can spin out to be used in the workflows with the default
* executor.
*
* @return the maximum number of concurrent workflow threads
*/
Expand All @@ -164,8 +174,10 @@ default int concurrentWorkflowExecutorThreads() {
/**
* The minimum number of threads the operator starts in the thread pool for workflows.
*
* @deprecated not used anymore by default executor implementation
* @return the minimum number of concurrent workflow threads
*/
@Deprecated(forRemoval = true)
default int minConcurrentWorkflowExecutorThreads() {
return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
}
Expand All @@ -191,13 +203,11 @@ default Metrics getMetrics() {
}

default ExecutorService getExecutorService() {
return newThreadPoolExecutor(minConcurrentReconciliationThreads(),
concurrentReconciliationThreads());
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
}

default ExecutorService getWorkflowExecutorService() {
return newThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(),
concurrentWorkflowExecutorThreads());
return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads());
}

default boolean closeClientOnStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,21 @@ public int concurrentWorkflowExecutorThreads() {
original.concurrentWorkflowExecutorThreads());
}

/**
* @deprecated Not used anymore in the default implementation
*/
@Deprecated(forRemoval = true)
@Override
public int minConcurrentReconciliationThreads() {
return minConcurrentReconciliationThreads != null ? minConcurrentReconciliationThreads
: original.minConcurrentReconciliationThreads();
}

/**
* @deprecated Not used anymore in the default implementation
*/
@Override
@Deprecated(forRemoval = true)
public int minConcurrentWorkflowExecutorThreads() {
return minConcurrentWorkflowExecutorThreads != null ? minConcurrentWorkflowExecutorThreads
: original.minConcurrentWorkflowExecutorThreads();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
Expand All @@ -35,14 +33,6 @@ public class ExecutorServiceManager {
start(configurationService);
}

public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) {
minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER);
maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1);

return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES,
new LinkedBlockingDeque<>());
}

/**
* Uses cachingExecutorService from this manager. Use this only for tasks, that don't have dynamic
* nature, in sense that won't grow with the number of inputs (thus kubernetes resources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Arrays;
import java.util.Date;
Expand Down Expand Up @@ -57,9 +56,7 @@ public static Version loadFromProperties() {
try {
String time = properties.getProperty("git.build.time");
if (time != null) {
builtTime =
// RFC 822 date is the default format used by git-commit-id-plugin
new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ").parse(time);
builtTime = Date.from(Instant.parse(time));
} else {
builtTime = Date.from(Instant.EPOCH);
}
Expand Down Expand Up @@ -122,9 +119,13 @@ static boolean getBooleanFromSystemPropsOrDefault(String propertyName, boolean d
}

public static Class<?> getFirstTypeArgumentFromExtendedClass(Class<?> clazz) {
return getTypeArgumentFromExtendedClassByIndex(clazz, 0);
}

public static Class<?> getTypeArgumentFromExtendedClassByIndex(Class<?> clazz, int index) {
try {
Type type = clazz.getGenericSuperclass();
return (Class<?>) ((ParameterizedType) type).getActualTypeArguments()[0];
return (Class<?>) ((ParameterizedType) type).getActualTypeArguments()[index];
} catch (Exception e) {
throw new RuntimeException(GENERIC_PARAMETER_TYPE_ERROR_PREFIX
+ clazz.getSimpleName()
Expand Down Expand Up @@ -193,27 +194,31 @@ private static Optional<? extends Class<?>> extractType(Class<?> clazz,

public static Class<?> getFirstTypeArgumentFromSuperClassOrInterface(Class<?> clazz,
Class<?> expectedImplementedInterface) {
return getTypeArgumentFromSuperClassOrInterfaceByIndex(clazz, expectedImplementedInterface, 0);
}

public static Class<?> getTypeArgumentFromSuperClassOrInterfaceByIndex(Class<?> clazz,
Class<?> expectedImplementedInterface, int index) {
// first check super class if it exists
try {
final Class<?> superclass = clazz.getSuperclass();
if (!superclass.equals(Object.class)) {
try {
return getFirstTypeArgumentFromExtendedClass(clazz);
return getTypeArgumentFromExtendedClassByIndex(clazz, index);
} catch (Exception e) {
// try interfaces
try {
return getFirstTypeArgumentFromInterface(clazz, expectedImplementedInterface);
return getTypeArgumentFromInterfaceByIndex(clazz, expectedImplementedInterface, index);
} catch (Exception ex) {
// try on the parent
return getFirstTypeArgumentFromSuperClassOrInterface(superclass,
expectedImplementedInterface);
return getTypeArgumentFromSuperClassOrInterfaceByIndex(superclass,
expectedImplementedInterface, index);
}
}
}
return getFirstTypeArgumentFromInterface(clazz, expectedImplementedInterface);
return getTypeArgumentFromInterfaceByIndex(clazz, expectedImplementedInterface, index);
} catch (Exception e) {
throw new OperatorException(
GENERIC_PARAMETER_TYPE_ERROR_PREFIX + clazz.getSimpleName(), e);
throw new OperatorException(GENERIC_PARAMETER_TYPE_ERROR_PREFIX + clazz.getSimpleName(), e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

import io.fabric8.kubernetes.api.model.GenericKubernetesResource;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.Utils;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.processing.GroupVersionKind;

public class GenericKubernetesDependentResource<P extends HasMetadata>
extends KubernetesDependentResource<GenericKubernetesResource, P> {

private GroupVersionKind groupVersionKind;
private final GroupVersionKind groupVersionKind;

public GenericKubernetesDependentResource(GroupVersionKind groupVersionKind) {
super(GenericKubernetesResource.class);
Expand All @@ -19,6 +20,13 @@ protected InformerConfiguration.InformerConfigurationBuilder<GenericKubernetesRe
return InformerConfiguration.from(groupVersionKind);
}

@SuppressWarnings("unchecked")
@Override
protected Class<P> getPrimaryResourceType() {
return (Class<P>) Utils.getFirstTypeArgumentFromExtendedClass(getClass());
}

@SuppressWarnings("unused")
public GroupVersionKind getGroupVersionKind() {
return groupVersionKind;
}
Expand Down
Loading
Loading