Skip to content

feat: API to check if next reconciliation is imminent #2272

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,18 @@ <R> Optional<R> getSecondaryResource(Class<R> expectedType,
* @return the {@link IndexerResourceCache} associated with the associated {@link Reconciler} for
* this context
*/
@SuppressWarnings("unused")
IndexedResourceCache<P> getPrimaryCache();

/**
* Determines whether a new reconciliation will be triggered right after the current
* reconciliation is finished. This allows to optimize certain situations, helping avoid unneeded
* API calls. A reconciler might, for example, skip updating the status when it's known another
* reconciliation is already scheduled, which would in turn trigger another status update, thus
* rendering the current one moot.
*
* @return {@code true} is another reconciliation is already scheduled, {@code false} otherwise
**/
boolean isNextReconciliationImminent();

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class DefaultContext<P extends HasMetadata> implements Context<P> {

Expand Down Expand Up @@ -45,6 +46,12 @@ public IndexedResourceCache<P> getPrimaryCache() {
return controller.getEventSourceManager().getControllerResourceEventSource();
}

@Override
public boolean isNextReconciliationImminent() {
return controller.getEventProcessor()
.isNextReconciliationImminent(ResourceID.fromResource(primaryResource));
}

@Override
public <R> Stream<R> getSecondaryResourcesAsStream(Class<R> expectedType) {
return controller.getEventSourceManager().getResourceEventSourcesFor(expectedType).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,10 @@ public void start() throws OperatorException {
handleAlreadyMarkedEvents();
}

public boolean isNextReconciliationImminent(ResourceID resourceID) {
return resourceStateManager.getOrCreate(resourceID).eventPresent();
}

private void handleAlreadyMarkedEvents() {
for (var state : resourceStateManager.resourcesWithEventPresent()) {
handleMarkedEventForResource(state);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package io.javaoperatorsdk.operator;

import java.time.Duration;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.nextreconciliationimminent.NextReconciliationImminentCustomResource;
import io.javaoperatorsdk.operator.sample.nextreconciliationimminent.NextReconciliationImminentReconciler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class NextReconciliationImminentIT {

private static final Logger log =
LoggerFactory.getLogger(NextReconciliationImminentIT.class);

public static final int WAIT_FOR_EVENT = 300;
public static final String TEST_RESOURCE_NAME = "test1";

@RegisterExtension
LocallyRunOperatorExtension extension =
LocallyRunOperatorExtension.builder()
.withReconciler(new NextReconciliationImminentReconciler())
.build();

@Test
void skippingStatusUpdateWithNextReconciliationImminent() throws InterruptedException {
var resource = extension.create(testResource());

var reconciler = extension.getReconcilerOfType(NextReconciliationImminentReconciler.class);
await().untilAsserted(() -> assertThat(reconciler.isReconciliationWaiting()).isTrue());
Thread.sleep(WAIT_FOR_EVENT);

resource.getMetadata().getAnnotations().put("trigger", "" + System.currentTimeMillis());
extension.replace(resource);
Thread.sleep(WAIT_FOR_EVENT);
log.info("Made change to trigger event");

reconciler.allowReconciliationToProceed();
Thread.sleep(WAIT_FOR_EVENT);
// second event arrived
await().untilAsserted(() -> assertThat(reconciler.isReconciliationWaiting()).isTrue());
reconciler.allowReconciliationToProceed();

await().pollDelay(Duration.ofMillis(WAIT_FOR_EVENT)).untilAsserted(() -> {
assertThat(extension.get(NextReconciliationImminentCustomResource.class, TEST_RESOURCE_NAME)
.getStatus().getUpdateNumber()).isEqualTo(1);
});
}


NextReconciliationImminentCustomResource testResource() {
var res = new NextReconciliationImminentCustomResource();
res.setMetadata(new ObjectMetaBuilder()
.withName(TEST_RESOURCE_NAME)
.build());
return res;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.javaoperatorsdk.operator.sample.nextreconciliationimminent;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("nri")
public class NextReconciliationImminentCustomResource
extends CustomResource<Void, NextReconciliationImminentStatus>
implements Namespaced {



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package io.javaoperatorsdk.operator.sample.nextreconciliationimminent;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;

@ControllerConfiguration(generationAwareEventProcessing = false)
public class NextReconciliationImminentReconciler
implements Reconciler<NextReconciliationImminentCustomResource> {

private static final Logger log =
LoggerFactory.getLogger(NextReconciliationImminentReconciler.class);

private final SynchronousQueue<Boolean> queue = new SynchronousQueue<>();
private volatile boolean reconciliationWaiting = false;

@Override
public UpdateControl<NextReconciliationImminentCustomResource> reconcile(
NextReconciliationImminentCustomResource resource,
Context<NextReconciliationImminentCustomResource> context) throws InterruptedException {
log.info("started reconciliation");
reconciliationWaiting = true;
// wait long enough to get manually allowed
queue.poll(120, TimeUnit.SECONDS);
log.info("Continue after wait");
reconciliationWaiting = false;

if (context.isNextReconciliationImminent()) {
return UpdateControl.noUpdate();
} else {
if (resource.getStatus() == null) {
resource.setStatus(new NextReconciliationImminentStatus());
}
resource.getStatus().setUpdateNumber(resource.getStatus().getUpdateNumber() + 1);
log.info("Patching status");
return UpdateControl.patchStatus(resource);
}
}

public void allowReconciliationToProceed() {
try {
queue.put(true);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public boolean isReconciliationWaiting() {
return reconciliationWaiting;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.javaoperatorsdk.operator.sample.nextreconciliationimminent;

public class NextReconciliationImminentStatus {

private int updateNumber;

public int getUpdateNumber() {
return updateNumber;
}

public void setUpdateNumber(int updateNumber) {
this.updateNumber = updateNumber;
}
}
Loading