Skip to content

Commit f51c65b

Browse files
csvirimetacosm
andauthored
fix: primary cache utils mechanism (#2814)
Reading from API server was not correct, this works in all cases only if the informer cache has the up to date resources. If we don't have the up to date resource in the cache, and don't do the update based on that, we cannot say for sure if we can remove the resource for the next event or not from overlay cache. Signed-off-by: Attila Mészáros <[email protected]> Signed-off-by: Chris Laprun <[email protected]> Co-authored-by: Chris Laprun <[email protected]>
1 parent d82d51d commit f51c65b

File tree

8 files changed

+126
-41
lines changed

8 files changed

+126
-41
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java

Lines changed: 51 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.javaoperatorsdk.operator.api.reconciler;
22

3+
import java.time.LocalTime;
4+
import java.time.temporal.ChronoUnit;
35
import java.util.function.UnaryOperator;
46

57
import org.slf4j.Logger;
@@ -25,6 +27,8 @@
2527
public class PrimaryUpdateAndCacheUtils {
2628

2729
public static final int DEFAULT_MAX_RETRY = 10;
30+
public static final int DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS = 10000;
31+
public static final int DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS = 50;
2832

2933
private PrimaryUpdateAndCacheUtils() {}
3034

@@ -90,8 +94,10 @@ public static <P extends HasMetadata> P ssaPatchStatusAndCacheResource(
9094
}
9195

9296
/**
93-
* Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator,
94-
* int)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY}.
97+
* Same as {@link #updateAndCacheResource(HasMetadata, Context, UnaryOperator, UnaryOperator, int,
98+
* long,long)} using the default maximum retry number as defined by {@link #DEFAULT_MAX_RETRY} and
99+
* default cache maximum polling time and period as defined, respectively by {@link
100+
* #DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS} and {@link #DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS}.
95101
*
96102
* @param resourceToUpdate original resource to update
97103
* @param context of reconciliation
@@ -106,7 +112,13 @@ public static <P extends HasMetadata> P updateAndCacheResource(
106112
UnaryOperator<P> modificationFunction,
107113
UnaryOperator<P> updateMethod) {
108114
return updateAndCacheResource(
109-
resourceToUpdate, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY);
115+
resourceToUpdate,
116+
context,
117+
modificationFunction,
118+
updateMethod,
119+
DEFAULT_MAX_RETRY,
120+
DEFAULT_RESOURCE_CACHE_TIMEOUT_MILLIS,
121+
DEFAULT_RESOURCE_CACHE_POLL_PERIOD_MILLIS);
110122
}
111123

112124
/**
@@ -124,16 +136,20 @@ public static <P extends HasMetadata> P updateAndCacheResource(
124136
* @param modificationFunction modifications to make on primary
125137
* @param updateMethod the update method implementation
126138
* @param maxRetry maximum number of retries before giving up
139+
* @param cachePollTimeoutMillis maximum amount of milliseconds to wait for the updated resource
140+
* to appear in cache
141+
* @param cachePollPeriodMillis cache polling period, in milliseconds
127142
* @param <P> primary type
128143
* @return the updated resource
129144
*/
130-
@SuppressWarnings("unchecked")
131145
public static <P extends HasMetadata> P updateAndCacheResource(
132146
P resourceToUpdate,
133147
Context<P> context,
134148
UnaryOperator<P> modificationFunction,
135149
UnaryOperator<P> updateMethod,
136-
int maxRetry) {
150+
int maxRetry,
151+
long cachePollTimeoutMillis,
152+
long cachePollPeriodMillis) {
137153

138154
if (log.isDebugEnabled()) {
139155
log.debug("Conflict retrying update for: {}", ResourceID.fromResource(resourceToUpdate));
@@ -180,14 +196,37 @@ public static <P extends HasMetadata> P updateAndCacheResource(
180196
resourceToUpdate.getMetadata().getNamespace(),
181197
e.getCode());
182198
resourceToUpdate =
183-
(P)
184-
context
185-
.getClient()
186-
.resources(resourceToUpdate.getClass())
187-
.inNamespace(resourceToUpdate.getMetadata().getNamespace())
188-
.withName(resourceToUpdate.getMetadata().getName())
189-
.get();
199+
pollLocalCache(
200+
context, resourceToUpdate, cachePollTimeoutMillis, cachePollPeriodMillis);
201+
}
202+
}
203+
}
204+
205+
private static <P extends HasMetadata> P pollLocalCache(
206+
Context<P> context, P staleResource, long timeoutMillis, long pollDelayMillis) {
207+
try {
208+
var resourceId = ResourceID.fromResource(staleResource);
209+
var startTime = LocalTime.now();
210+
final var timeoutTime = startTime.plus(timeoutMillis, ChronoUnit.MILLIS);
211+
while (timeoutTime.isAfter(LocalTime.now())) {
212+
log.debug("Polling cache for resource: {}", resourceId);
213+
var cachedResource = context.getPrimaryCache().get(resourceId).orElseThrow();
214+
if (!cachedResource
215+
.getMetadata()
216+
.getResourceVersion()
217+
.equals(staleResource.getMetadata().getResourceVersion())) {
218+
return context
219+
.getControllerConfiguration()
220+
.getConfigurationService()
221+
.getResourceCloner()
222+
.clone(cachedResource);
223+
}
224+
Thread.sleep(pollDelayMillis);
190225
}
226+
throw new OperatorException("Timeout of resource polling from cache for resource");
227+
} catch (InterruptedException e) {
228+
Thread.currentThread().interrupt();
229+
throw new OperatorException(e);
191230
}
192231
}
193232
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,7 @@ public synchronized void putResource(T newResource, String previousResourceVersi
126126
knownResourceVersions.add(newResource.getMetadata().getResourceVersion());
127127
}
128128
var resourceId = ResourceID.fromResource(newResource);
129-
var cachedResource =
130-
getResourceFromCache(resourceId)
131-
.orElse(managedInformerEventSource.get(resourceId).orElse(null));
129+
var cachedResource = managedInformerEventSource.get(resourceId).orElse(null);
132130

133131
boolean moveAhead = false;
134132
if (previousResourceVersion == null && cachedResource == null) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtilsTest.java

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,22 @@
11
package io.javaoperatorsdk.operator.api.reconciler;
22

3+
import java.util.Optional;
34
import java.util.function.UnaryOperator;
45

56
import org.junit.jupiter.api.BeforeEach;
67
import org.junit.jupiter.api.Test;
78

9+
import io.fabric8.kubernetes.api.model.HasMetadata;
810
import io.fabric8.kubernetes.client.KubernetesClient;
911
import io.fabric8.kubernetes.client.KubernetesClientException;
1012
import io.fabric8.kubernetes.client.dsl.MixedOperation;
1113
import io.fabric8.kubernetes.client.dsl.Resource;
14+
import io.fabric8.kubernetes.client.utils.KubernetesSerialization;
1215
import io.javaoperatorsdk.operator.OperatorException;
1316
import io.javaoperatorsdk.operator.TestUtils;
17+
import io.javaoperatorsdk.operator.api.config.Cloner;
18+
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
19+
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
1420
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
1521
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource;
1622
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
@@ -29,6 +35,7 @@ class PrimaryUpdateAndCacheUtilsTest {
2935
Context<TestCustomResource> context = mock(Context.class);
3036
KubernetesClient client = mock(KubernetesClient.class);
3137
Resource resource = mock(Resource.class);
38+
IndexedResourceCache<TestCustomResource> primaryCache = mock(IndexedResourceCache.class);
3239

3340
@BeforeEach
3441
void setup() {
@@ -41,6 +48,20 @@ void setup() {
4148
when(mixedOp.inNamespace(any())).thenReturn(mixedOp);
4249
when(mixedOp.withName(any())).thenReturn(resource);
4350
when(resource.get()).thenReturn(TestUtils.testCustomResource1());
51+
when(context.getPrimaryCache()).thenReturn(primaryCache);
52+
53+
var controllerConfiguration = mock(ControllerConfiguration.class);
54+
when(context.getControllerConfiguration()).thenReturn(controllerConfiguration);
55+
var configService = mock(ConfigurationService.class);
56+
when(controllerConfiguration.getConfigurationService()).thenReturn(configService);
57+
when(configService.getResourceCloner())
58+
.thenReturn(
59+
new Cloner() {
60+
@Override
61+
public <R extends HasMetadata> R clone(R object) {
62+
return new KubernetesSerialization().clone(object);
63+
}
64+
});
4465
}
4566

4667
@Test
@@ -76,6 +97,10 @@ void retriesConflicts() {
7697
when(updateOperation.apply(any()))
7798
.thenThrow(new KubernetesClientException("", 409, null))
7899
.thenReturn(TestUtils.testCustomResource1());
100+
var freshResource = TestUtils.testCustomResource1();
101+
102+
freshResource.getMetadata().setResourceVersion("2");
103+
when(primaryCache.get(any())).thenReturn(Optional.of(freshResource));
79104

80105
var updated =
81106
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
@@ -89,15 +114,21 @@ void retriesConflicts() {
89114
updateOperation);
90115

91116
assertThat(updated).isNotNull();
92-
verify(resource, times(1)).get();
117+
verify(primaryCache, times(1)).get(any());
93118
}
94119

95120
@Test
96121
void throwsIfRetryExhausted() {
97122
var updateOperation = mock(UnaryOperator.class);
98123

99124
when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null));
125+
var stubbing = when(primaryCache.get(any()));
100126

127+
for (int i = 0; i < DEFAULT_MAX_RETRY; i++) {
128+
var resource = TestUtils.testCustomResource1();
129+
resource.getMetadata().setResourceVersion("" + i);
130+
stubbing = stubbing.thenReturn(Optional.of(resource));
131+
}
101132
assertThrows(
102133
OperatorException.class,
103134
() ->
@@ -106,6 +137,28 @@ void throwsIfRetryExhausted() {
106137
context,
107138
UnaryOperator.identity(),
108139
updateOperation));
109-
verify(resource, times(DEFAULT_MAX_RETRY)).get();
140+
verify(primaryCache, times(DEFAULT_MAX_RETRY)).get(any());
141+
}
142+
143+
@Test
144+
void cachePollTimeouts() {
145+
var updateOperation = mock(UnaryOperator.class);
146+
147+
when(updateOperation.apply(any())).thenThrow(new KubernetesClientException("", 409, null));
148+
when(primaryCache.get(any())).thenReturn(Optional.of(TestUtils.testCustomResource1()));
149+
150+
var ex =
151+
assertThrows(
152+
OperatorException.class,
153+
() ->
154+
PrimaryUpdateAndCacheUtils.updateAndCacheResource(
155+
TestUtils.testCustomResource1(),
156+
context,
157+
UnaryOperator.identity(),
158+
updateOperation,
159+
2,
160+
50L,
161+
10L));
162+
assertThat(ex.getMessage()).contains("Timeout");
110163
}
111164
}
Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,5 @@
99
@Group("sample.javaoperatorsdk")
1010
@Version("v1")
1111
@ShortNames("spwl")
12-
public class StatusPatchCacheWithLockCustomResource
13-
extends CustomResource<StatusPatchCacheWithLockSpec, StatusPatchCacheWithLockStatus>
14-
implements Namespaced {}
12+
public class StatusPatchCacheCustomResource
13+
extends CustomResource<StatusPatchCacheSpec, StatusPatchCacheStatus> implements Namespaced {}
Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,19 @@
1111
import static org.assertj.core.api.Assertions.assertThat;
1212
import static org.awaitility.Awaitility.await;
1313

14-
public class StatusPatchCacheWithLockIT {
14+
public class StatusPatchCacheIT {
1515

1616
public static final String TEST_1 = "test1";
1717

1818
@RegisterExtension
1919
LocallyRunOperatorExtension extension =
2020
LocallyRunOperatorExtension.builder()
21-
.withReconciler(StatusPatchCacheWithLockReconciler.class)
21+
.withReconciler(StatusPatchCacheReconciler.class)
2222
.build();
2323

2424
@Test
2525
void testStatusAlwaysUpToDate() {
26-
var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class);
26+
var reconciler = extension.getReconcilerOfType(StatusPatchCacheReconciler.class);
2727

2828
extension.create(testResource());
2929

@@ -39,10 +39,10 @@ void testStatusAlwaysUpToDate() {
3939
});
4040
}
4141

42-
StatusPatchCacheWithLockCustomResource testResource() {
43-
var res = new StatusPatchCacheWithLockCustomResource();
42+
StatusPatchCacheCustomResource testResource() {
43+
var res = new StatusPatchCacheCustomResource();
4444
res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build());
45-
res.setSpec(new StatusPatchCacheWithLockSpec());
45+
res.setSpec(new StatusPatchCacheSpec());
4646
return res;
4747
}
4848
}
Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,14 @@
1212
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
1313

1414
@ControllerConfiguration
15-
public class StatusPatchCacheWithLockReconciler
16-
implements Reconciler<StatusPatchCacheWithLockCustomResource> {
15+
public class StatusPatchCacheReconciler implements Reconciler<StatusPatchCacheCustomResource> {
1716

1817
public volatile int latestValue = 0;
1918
public volatile boolean errorPresent = false;
2019

2120
@Override
22-
public UpdateControl<StatusPatchCacheWithLockCustomResource> reconcile(
23-
StatusPatchCacheWithLockCustomResource resource,
24-
Context<StatusPatchCacheWithLockCustomResource> context) {
21+
public UpdateControl<StatusPatchCacheCustomResource> reconcile(
22+
StatusPatchCacheCustomResource resource, Context<StatusPatchCacheCustomResource> context) {
2523

2624
if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) {
2725
errorPresent = true;
@@ -50,22 +48,20 @@ public UpdateControl<StatusPatchCacheWithLockCustomResource> reconcile(
5048
}
5149

5250
@Override
53-
public List<EventSource<?, StatusPatchCacheWithLockCustomResource>> prepareEventSources(
54-
EventSourceContext<StatusPatchCacheWithLockCustomResource> context) {
51+
public List<EventSource<?, StatusPatchCacheCustomResource>> prepareEventSources(
52+
EventSourceContext<StatusPatchCacheCustomResource> context) {
5553
// periodic event triggering for testing purposes
5654
return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache()));
5755
}
5856

59-
private StatusPatchCacheWithLockCustomResource createFreshCopy(
60-
StatusPatchCacheWithLockCustomResource resource) {
61-
var res = new StatusPatchCacheWithLockCustomResource();
57+
private StatusPatchCacheCustomResource createFreshCopy(StatusPatchCacheCustomResource resource) {
58+
var res = new StatusPatchCacheCustomResource();
6259
res.setMetadata(
6360
new ObjectMetaBuilder()
6461
.withName(resource.getMetadata().getName())
6562
.withNamespace(resource.getMetadata().getNamespace())
6663
.build());
67-
res.setStatus(new StatusPatchCacheWithLockStatus());
68-
64+
res.setStatus(new StatusPatchCacheStatus());
6965
return res;
7066
}
7167
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package io.javaoperatorsdk.operator.baseapi.statuscache;
22

3-
public class StatusPatchCacheWithLockSpec {
3+
public class StatusPatchCacheSpec {
44

55
private int counter = 0;
66

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
package io.javaoperatorsdk.operator.baseapi.statuscache;
22

3-
public class StatusPatchCacheWithLockStatus {
3+
public class StatusPatchCacheStatus {
44

55
private Integer value = 0;
66

77
public Integer getValue() {
88
return value;
99
}
1010

11-
public StatusPatchCacheWithLockStatus setValue(Integer value) {
11+
public StatusPatchCacheStatus setValue(Integer value) {
1212
this.value = value;
1313
return this;
1414
}

0 commit comments

Comments
 (0)