Skip to content

Commit 2417c5c

Browse files
csvirimetacosm
authored andcommitted
feat: API to check if next reconciliation is imminent (#2272)
Signed-off-by: Attila Mészáros <[email protected]>
1 parent c8ff7de commit 2417c5c

File tree

7 files changed

+180
-0
lines changed

7 files changed

+180
-0
lines changed

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java

+13
Original file line numberDiff line numberDiff line change
@@ -51,5 +51,18 @@ <R> Optional<R> getSecondaryResource(Class<R> expectedType,
5151
* @return the {@link IndexerResourceCache} associated with the associated {@link Reconciler} for
5252
* this context
5353
*/
54+
@SuppressWarnings("unused")
5455
IndexedResourceCache<P> getPrimaryCache();
56+
57+
/**
58+
* Determines whether a new reconciliation will be triggered right after the current
59+
* reconciliation is finished. This allows to optimize certain situations, helping avoid unneeded
60+
* API calls. A reconciler might, for example, skip updating the status when it's known another
61+
* reconciliation is already scheduled, which would in turn trigger another status update, thus
62+
* rendering the current one moot.
63+
*
64+
* @return {@code true} is another reconciliation is already scheduled, {@code false} otherwise
65+
**/
66+
boolean isNextReconciliationImminent();
67+
5568
}

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

+7
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
1414
import io.javaoperatorsdk.operator.processing.Controller;
1515
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
16+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1617

1718
public class DefaultContext<P extends HasMetadata> implements Context<P> {
1819

@@ -45,6 +46,12 @@ public IndexedResourceCache<P> getPrimaryCache() {
4546
return controller.getEventSourceManager().getControllerResourceEventSource();
4647
}
4748

49+
@Override
50+
public boolean isNextReconciliationImminent() {
51+
return controller.getEventProcessor()
52+
.isNextReconciliationImminent(ResourceID.fromResource(primaryResource));
53+
}
54+
4855
@Override
4956
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
5057
return controller.getEventSourceManager().getResourceEventSourcesFor(expectedType).stream()

Diff for: operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

+4
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,10 @@ public synchronized void start() throws OperatorException {
409409
handleAlreadyMarkedEvents();
410410
}
411411

412+
public boolean isNextReconciliationImminent(ResourceID resourceID) {
413+
return resourceStateManager.getOrCreate(resourceID).eventPresent();
414+
}
415+
412416
private void handleAlreadyMarkedEvents() {
413417
for (var state : resourceStateManager.resourcesWithEventPresent()) {
414418
log.debug("Handling already marked event on start. State: {}", state);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.time.Duration;
4+
5+
import org.junit.jupiter.api.Test;
6+
import org.junit.jupiter.api.extension.RegisterExtension;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
11+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
12+
import io.javaoperatorsdk.operator.sample.nextreconciliationimminent.NextReconciliationImminentCustomResource;
13+
import io.javaoperatorsdk.operator.sample.nextreconciliationimminent.NextReconciliationImminentReconciler;
14+
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
import static org.awaitility.Awaitility.await;
17+
18+
public class NextReconciliationImminentIT {
19+
20+
private static final Logger log =
21+
LoggerFactory.getLogger(NextReconciliationImminentIT.class);
22+
23+
public static final int WAIT_FOR_EVENT = 300;
24+
public static final String TEST_RESOURCE_NAME = "test1";
25+
26+
@RegisterExtension
27+
LocallyRunOperatorExtension extension =
28+
LocallyRunOperatorExtension.builder()
29+
.withReconciler(new NextReconciliationImminentReconciler())
30+
.build();
31+
32+
@Test
33+
void skippingStatusUpdateWithNextReconciliationImminent() throws InterruptedException {
34+
var resource = extension.create(testResource());
35+
36+
var reconciler = extension.getReconcilerOfType(NextReconciliationImminentReconciler.class);
37+
await().untilAsserted(() -> assertThat(reconciler.isReconciliationWaiting()).isTrue());
38+
Thread.sleep(WAIT_FOR_EVENT);
39+
40+
resource.getMetadata().getAnnotations().put("trigger", "" + System.currentTimeMillis());
41+
extension.replace(resource);
42+
Thread.sleep(WAIT_FOR_EVENT);
43+
log.info("Made change to trigger event");
44+
45+
reconciler.allowReconciliationToProceed();
46+
Thread.sleep(WAIT_FOR_EVENT);
47+
// second event arrived
48+
await().untilAsserted(() -> assertThat(reconciler.isReconciliationWaiting()).isTrue());
49+
reconciler.allowReconciliationToProceed();
50+
51+
await().pollDelay(Duration.ofMillis(WAIT_FOR_EVENT)).untilAsserted(() -> {
52+
assertThat(extension.get(NextReconciliationImminentCustomResource.class, TEST_RESOURCE_NAME)
53+
.getStatus().getUpdateNumber()).isEqualTo(1);
54+
});
55+
}
56+
57+
58+
NextReconciliationImminentCustomResource testResource() {
59+
var res = new NextReconciliationImminentCustomResource();
60+
res.setMetadata(new ObjectMetaBuilder()
61+
.withName(TEST_RESOURCE_NAME)
62+
.build());
63+
return res;
64+
}
65+
66+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.javaoperatorsdk.operator.sample.nextreconciliationimminent;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.ShortNames;
7+
import io.fabric8.kubernetes.model.annotation.Version;
8+
9+
@Group("sample.javaoperatorsdk")
10+
@Version("v1")
11+
@ShortNames("nri")
12+
public class NextReconciliationImminentCustomResource
13+
extends CustomResource<Void, NextReconciliationImminentStatus>
14+
implements Namespaced {
15+
16+
17+
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package io.javaoperatorsdk.operator.sample.nextreconciliationimminent;
2+
3+
import java.util.concurrent.SynchronousQueue;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import io.javaoperatorsdk.operator.api.reconciler.Context;
10+
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
11+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
12+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
13+
14+
@ControllerConfiguration(generationAwareEventProcessing = false)
15+
public class NextReconciliationImminentReconciler
16+
implements Reconciler<NextReconciliationImminentCustomResource> {
17+
18+
private static final Logger log =
19+
LoggerFactory.getLogger(NextReconciliationImminentReconciler.class);
20+
21+
private final SynchronousQueue<Boolean> queue = new SynchronousQueue<>();
22+
private volatile boolean reconciliationWaiting = false;
23+
24+
@Override
25+
public UpdateControl<NextReconciliationImminentCustomResource> reconcile(
26+
NextReconciliationImminentCustomResource resource,
27+
Context<NextReconciliationImminentCustomResource> context) throws InterruptedException {
28+
log.info("started reconciliation");
29+
reconciliationWaiting = true;
30+
// wait long enough to get manually allowed
31+
queue.poll(120, TimeUnit.SECONDS);
32+
log.info("Continue after wait");
33+
reconciliationWaiting = false;
34+
35+
if (context.isNextReconciliationImminent()) {
36+
return UpdateControl.noUpdate();
37+
} else {
38+
if (resource.getStatus() == null) {
39+
resource.setStatus(new NextReconciliationImminentStatus());
40+
}
41+
resource.getStatus().setUpdateNumber(resource.getStatus().getUpdateNumber() + 1);
42+
log.info("Patching status");
43+
return UpdateControl.patchStatus(resource);
44+
}
45+
}
46+
47+
public void allowReconciliationToProceed() {
48+
try {
49+
queue.put(true);
50+
} catch (InterruptedException e) {
51+
throw new RuntimeException(e);
52+
}
53+
}
54+
55+
public boolean isReconciliationWaiting() {
56+
return reconciliationWaiting;
57+
}
58+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.javaoperatorsdk.operator.sample.nextreconciliationimminent;
2+
3+
public class NextReconciliationImminentStatus {
4+
5+
private int updateNumber;
6+
7+
public int getUpdateNumber() {
8+
return updateNumber;
9+
}
10+
11+
public void setUpdateNumber(int updateNumber) {
12+
this.updateNumber = updateNumber;
13+
}
14+
}

0 commit comments

Comments
 (0)